concurrent

Java BlockingQueue Example

In this example we will discuss about java.util.concurrent.BlockingQueue interface. java.util.concurrent.BlockingQueue was added in Java 1.5 along with all the other classes and interfaces of java.util.concurrent package. However, what is BlockingQueue and what is the difference with the simple java.util.Queue? How can we use BlockingQueues? Those questions will be answered in the following sections along with a simple example of BlockingQueue's usage.

1. What is a BlockingQueue?

BlockingQueue is a queue which is thread safe to insert or retrieve elements from it. Also, it provides a mechanism which blocks requests for inserting new elements when the queue is full or requests for removing elements when the queue is empty, with the additional option to stop waiting when a specific timeout passes. This functionality makes BlockingQueue a nice way of implementing the Producer-Consumer pattern, as the producing thread can insert elements until the upper limit of BlockingQueue while the consuming thread can retrieve elements until the lower limit is reached and of course with the support of the aforementioned blocking functionality.

2. Queues vs BlockingQueues

A java.util.Queue is an interface which extends Collection interface and provides methods for inserting, removing or inspecting elements. First-In-First-Out (FIFO) is a very commonly used method for describing a standard queue, while an alternative one would be to order queue elements in LIFO (Last-In-First-Out). However, BlockingQueues are more preferable for concurrent development.

3. BlockingQueue methods and implementations

The classes that implement BlockingQueue interface are available in java.util.concurrent package and they are the following:

For more information for each one of the above classes, you can visit the respective javadoc.

Also, BlockingQueue provides methods for inserting, removing and examining elements which are divided in four categories, depending on the way of handling the operations that cannot be satisfied immediately. Meanwhile in cases that the thread tries to insert an element in a full queue or remove an element from an empty queue. The first category includes the methods that throw an exception, the second category includes the methods returning a special value (e.g. null or false), the third category is related to those methods that block the thread until the operation can be accomplished, and finally, the fourth category includes the methods that block the thread for a given maximum time limit before giving up. These methods are summarized below:

  • Methods related to insertion
    1. Throws exception: add(e)
    2. Special value: offer(e)
    3. Blocks: put(e)
    4. Times out: offer(e, time, unit)
  • Methods related to removal
    1. Throws exception: remove()
    2. Special value: poll()
    3. Blocks: take()
    4. Times out: poll(time, unit)
  • Methods related to examination
    1. Throws exception: element()
    2. Special value: peek()
    3. Blocks: not applicable
    4. Times out: not applicable

4. BlockingQueue example

In this section we will show a simple example using BlockingQueue and the ArrayBlockingQueue implementation of the BlockingQueue interface.

First, create a java class named BlockingQueueExample.java with the following code:

BlockingQueueExample.java

package com.javacodegeeks.java.util.concurrent.blockingqueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {

	public static void main(String[] args) throws Exception {

		BlockingQueue bq = new ArrayBlockingQueue(1000);

		Producer producer = new Producer(bq);
		Consumer consumer = new Consumer(bq);

		new Thread(producer).start();
		new Thread(consumer).start();

		Thread.sleep(4000);
	}

}

Then, create a java class named Producer.java with the following code:

Producer.java

package com.javacodegeeks.java.util.concurrent.blockingqueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

	private BlockingQueue bq = null;

	public Producer(BlockingQueue queue) {
		this.setBlockingQueue(queue);
	}

	public void run() {

		Random rand = new Random();
		int res = 0;
		try {
			res = Addition(rand.nextInt(100), rand.nextInt(50));
			System.out.println("Produced: " + res);
			bq.put(res);
			Thread.sleep(1000);
			res = Addition(rand.nextInt(100), rand.nextInt(50));
			System.out.println("Produced: " + res);
			bq.put(res);
			Thread.sleep(1000);
			res = Addition(rand.nextInt(100), rand.nextInt(50));
			System.out.println("Produced: " + res);
			bq.put(res);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public void setBlockingQueue(BlockingQueue bq) {
		this.bq = bq;
	}

	public int Addition(int x, int y) {
		int result = 0;
		result = x + y;
		return result;
	}

}

Finally, create a java class named Consumer.java with the following code:

Consumer.java

package com.javacodegeeks.java.util.concurrent.blockingqueue;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

	protected BlockingQueue queue = null;

	public Consumer(BlockingQueue queue) {
		this.queue = queue;
	}

	public void run() {
		try {
			System.out.println("Consumed: " + queue.take());
			System.out.println("Consumed: " + queue.take());
			System.out.println("Consumed: " + queue.take());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

Let’s explain the above code. First, in the BlockingQueueExample class, we start a Producer and a Consumer in separate threads. The Producer adds random integers (between 0 and 100) and puts the result of the addition to a shared BlockingQueue. The Consumer takes the integers and prints them to the output. The Consumer uses the method take() which retrieves and removes the head of the queue and in case an element is not available, it blocks until this element becomes available.

If we run the above code, we will have the following results:

  • Output:
Produced: 93
Consumed: 93
Produced: 69
Consumed: 69
Produced: 76
Consumed: 76

5. Download the source code

This was an example of how to use BlockingQueue interface. Download the Eclipse project from here: BlockingQueueExample.zip

Konstantina Dimtsa

Konstantina has graduated from the Department of Informatics and Telecommunications in National and Kapodistrian University of Athens (NKUA) and she is currently pursuing M.Sc studies in Advanced Information Systems at the same department. She is also working as a research associate for NKUA in the field of telecommunications. Her main interests lie in software engineering, web applications, databases and telecommunications.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button