Condition

java.util.concurrent.locks.Condition Example

In this tutorial we will discuss about the Condition interface in Java. A Condition object, also known as condition variable, provides a thread with the ability to suspend its execution, until the condition is true. A Condition object is necessarily bound to a Lock and can be obtained using the newCondition() method.

Furthermore, a Condition enables the effect of having multiple wait-sets per object, by combining these sets with the use of a Lock implementation. Moreover, due to the fact that Conditions access portions of state shared among different threads, the usage of a Lock is mandatory. It is important to mention that a Condition must atomically release the associated Lock and suspend the current’s thread execution.

Finally, the Condition interface exists since the 1.5 version of Java.

The structure of the Condition interface

Methods

  • await()
  • The current thread suspends its execution until it is signalled or interrupted.

  • await(long time, TimeUnit unit)
  • The current thread suspends its execution until it is signalled, interrupted, or the specified amount of time elapses.

  • awaitNanos(long nanosTimeout)
  • The current thread suspends its execution until it is signalled, interrupted, or the specified amount of time elapses.

  • awaitUninterruptibly()
  • The current thread suspends its execution until it is signalled (cannot be interrupted).

  • await(long time, TimeUnit unit)
  • The current thread suspends its execution until it is signalled, interrupted, or the specified deadline elapses.

  • signal()
  • This method wakes a thread waiting on this condition.

  • signalAll()
  • This method wakes all threads waiting on this condition.

Conditions in Java

As we have already described, Conditions are being used in order for a thread to be notified, when a condition is true. One fundamental example that demonstrates the usage of Conditions is the producer-consumer example. According to this model, a thread produces a number of items and places them to a shared queue, while a thread consumes these objects, by removing them from the shared queue.

Important: Notice that the model supports the presence of multiple producers and consumers, but in this example, we will demonstrate the simple case where we have one producer and one consumer.

In addition, it is important to mention that the shared queue is accessed by multiple threads and thus, it must be properly synchronized. Our implementation of the shared queue is shown below:

SharedQueue.java:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class SharedFiFoQueue {

	private Object[] elems = null;
	private int current = 0;
	private int placeIndex = 0;
	private int removeIndex = 0;
	
	private final Lock lock = new ReentrantLock();
	private final Condition isEmpty = lock.newCondition();
	private final Condition isFull = lock.newCondition();
	
	public SharedFiFoQueue(int capacity) {
		this.elems = new Object[capacity];
	}
	
	public void add(Object elem) throws InterruptedException {
		lock.lock();
		while(current >= elems.length)
			isFull.await();
	
		elems[placeIndex] = elem;
		
		//We need the modulo, in order to avoid going out of bounds.
		placeIndex = (placeIndex + 1) % elems.length;
		
		++current;
		
		//Notify the consumer that there is data available.
		isEmpty.signal();
		
		lock.unlock();
	}

	public Object remove() throws InterruptedException {
		Object elem = null;
		
		lock.lock();
		while(current <= 0)
			isEmpty.await();
	
		elem = elems[removeIndex];

		//We need the modulo, in order to avoid going out of bounds.
		removeIndex = (removeIndex + 1) % elems.length;
		
		--current;
		
		//Notify the producer that there is space available.
		isFull.signal();
		
		lock.unlock();
		
		return elem;
	}
}

The SharedQueue class contains a private array of elements and a maximum capacity. It supports two methods, add and remove, which are used to add and remove an element to the queue respectively. In both methods, the lock is first acquired. Then, if the queue is not full, an element can be inserted, or correspondingly, if the queue is not empty, an element can be removed. Finally, before the lock is released, both methods notify any waiting thread.

Producer.java:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

public class Producer extends Thread {
	private final static String FILENAME = "input.txt";
	private final SharedFiFoQueue queue;
	
	public Producer(SharedFiFoQueue queue) {
		this.queue = queue;
	}
	
	@Override
	public void run() {
		BufferedReader rd = null;
		
		try {
			rd = new BufferedReader(new FileReader(FILENAME));
			
			String inputLine = null;
			while((inputLine = rd.readLine()) != null) {
				String[] inputWords = inputLine.split(" ");
				
				for(String inputWord: inputWords)
					queue.add(inputWord);
			}
			
			//Terminate the execution.
			queue.add(null);
		}
		catch (InterruptedException ex) {
			System.err.println("An InterruptedException was caught: " + ex.getMessage());
			ex.printStackTrace();
		}
		catch (IOException ex) {
			System.err.println("An IOException was caught: " + ex.getMessage());
			ex.printStackTrace();
		}
		finally {
			try {
				if(rd != null)
					rd.close();
			}
			catch (IOException ex) {
				System.err.println("An IOException was caught: " + ex.getMessage());
				ex.printStackTrace();
			}
		}
	}
}

The Producer class reads the contents of the specified file, line-by-line. Each line is split into separated words and each word is placed into the shared queue. Once the file has been completely read, a special null object is placed into the queue, in order to notify the consumer that no more elements will be placed into the queue.

Consumer.java:

import java.util.HashSet;
import java.util.Set;

public class Consumer extends Thread {
	private final Set seenObjects = new HashSet();
	private int total = 0;
	private final SharedFiFoQueue queue;

	public Consumer(SharedFiFoQueue queue) {
		this.queue = queue;
	}
	
	@Override
	public void run() {
		try {
			do {
				Object obj = queue.remove();
				if(obj == null)
					break;
				
				if(!seenObjects.contains(obj)) {
					++total;
					seenObjects.add(obj);
				}
				
				System.out.println("[Consumer] Read the element: " + obj.toString());
				
			} while(true);
		}
		catch (InterruptedException ex) {
			System.err.println("An InterruptedException was caught: " + ex.getMessage());
			ex.printStackTrace();
		}
		
		System.out.println("\n[Consumer] " + total + " distinct words have been read...");
	}
}

The Consumer class constantly reads elements from the shared queue, until a special null object is received. The Consumer class also counts the number of distinct words, as received by the producer.

ConditionExample.java:

public class ConditionExample {
	public static void main(String[] args) throws InterruptedException {
		SharedFiFoQueue sharedQueue = new SharedFiFoQueue(10);
		
		//Create a producer and a consumer.
		Thread producer = new Producer(sharedQueue);
		Thread consumer = new Consumer(sharedQueue);
		
		//Start both threads.
		producer.start();
		consumer.start();
		
		//Wait for both threads to terminate.
		producer.join();
		consumer.join();
	}
}

In a sample main method, we create one instance of each class and wait for both threads to terminate. A sample execution is shown below:

[Consumer] Read the element: Lorem
[Consumer] Read the element: ipsum
[Consumer] Read the element: dolor
[Consumer] Read the element: sit
...
[Consumer] Read the element: in
[Consumer] Read the element: est.

[Consumer] 80 distinct words have been read...

Download the Eclipse Project

This was a tutorial about the Condition interface in Java.

Download
You can download the full source code of this example here: ConditionExamples.zip.

Sotirios-Efstathios Maneas

Sotirios-Efstathios (Stathis) Maneas is a PhD student at the Department of Computer Science at the University of Toronto. His main interests include distributed systems, storage systems, file systems, and operating systems.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

2 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Madhavi
Madhavi
6 years ago

Producer and Consumer constructors were showing error because of the ‘queue’ instance variable declared as ‘final’.
private final SharedFiFoQueue queue;

Konstantin Pavlov
6 years ago
Back to top button