java.util.concurrent.locks.Condition Example
In this tutorial we will discuss about the Condition
interface in Java. A Condition
object, also known as condition variable
, provides a thread with the ability to suspend its execution, until the condition is true. A Condition
object is necessarily bound to a Lock
and can be obtained using the newCondition()
method.
Furthermore, a Condition
enables the effect of having multiple wait-sets per object, by combining these sets with the use of a Lock
implementation. Moreover, due to the fact that Conditions
access portions of state shared among different threads, the usage of a Lock
is mandatory. It is important to mention that a Condition
must atomically release the associated Lock
and suspend the current’s thread execution.
Finally, the Condition
interface exists since the 1.5 version of Java.
The structure of the Condition interface
Methods
await()
await(long time, TimeUnit unit)
awaitNanos(long nanosTimeout)
awaitUninterruptibly()
await(long time, TimeUnit unit)
signal()
signalAll()
The current thread suspends its execution until it is signalled or interrupted.
The current thread suspends its execution until it is signalled, interrupted, or the specified amount of time elapses.
The current thread suspends its execution until it is signalled, interrupted, or the specified amount of time elapses.
The current thread suspends its execution until it is signalled (cannot be interrupted).
The current thread suspends its execution until it is signalled, interrupted, or the specified deadline elapses.
This method wakes a thread waiting on this condition.
This method wakes all threads waiting on this condition.
Conditions in Java
As we have already described, Conditions
are being used in order for a thread to be notified, when a condition is true. One fundamental example that demonstrates the usage of Conditions
is the producer-consumer example. According to this model, a thread produces a number of items and places them to a shared queue, while a thread consumes these objects, by removing them from the shared queue.
Important: Notice that the model supports the presence of multiple producers and consumers, but in this example, we will demonstrate the simple case where we have one producer and one consumer.
In addition, it is important to mention that the shared queue is accessed by multiple threads and thus, it must be properly synchronized. Our implementation of the shared queue is shown below:
SharedQueue.java:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class SharedFiFoQueue { private Object[] elems = null; private int current = 0; private int placeIndex = 0; private int removeIndex = 0; private final Lock lock = new ReentrantLock(); private final Condition isEmpty = lock.newCondition(); private final Condition isFull = lock.newCondition(); public SharedFiFoQueue(int capacity) { this.elems = new Object[capacity]; } public void add(Object elem) throws InterruptedException { lock.lock(); while(current >= elems.length) isFull.await(); elems[placeIndex] = elem; //We need the modulo, in order to avoid going out of bounds. placeIndex = (placeIndex + 1) % elems.length; ++current; //Notify the consumer that there is data available. isEmpty.signal(); lock.unlock(); } public Object remove() throws InterruptedException { Object elem = null; lock.lock(); while(current <= 0) isEmpty.await(); elem = elems[removeIndex]; //We need the modulo, in order to avoid going out of bounds. removeIndex = (removeIndex + 1) % elems.length; --current; //Notify the producer that there is space available. isFull.signal(); lock.unlock(); return elem; } }
The SharedQueue
class contains a private array of elements and a maximum capacity. It supports two methods, add
and remove
, which are used to add and remove an element to the queue respectively. In both methods, the lock
is first acquired. Then, if the queue is not full, an element can be inserted, or correspondingly, if the queue is not empty, an element can be removed. Finally, before the lock is released, both methods notify any waiting thread.
Producer.java:
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; public class Producer extends Thread { private final static String FILENAME = "input.txt"; private final SharedFiFoQueue queue; public Producer(SharedFiFoQueue queue) { this.queue = queue; } @Override public void run() { BufferedReader rd = null; try { rd = new BufferedReader(new FileReader(FILENAME)); String inputLine = null; while((inputLine = rd.readLine()) != null) { String[] inputWords = inputLine.split(" "); for(String inputWord: inputWords) queue.add(inputWord); } //Terminate the execution. queue.add(null); } catch (InterruptedException ex) { System.err.println("An InterruptedException was caught: " + ex.getMessage()); ex.printStackTrace(); } catch (IOException ex) { System.err.println("An IOException was caught: " + ex.getMessage()); ex.printStackTrace(); } finally { try { if(rd != null) rd.close(); } catch (IOException ex) { System.err.println("An IOException was caught: " + ex.getMessage()); ex.printStackTrace(); } } } }
The Producer
class reads the contents of the specified file, line-by-line. Each line is split into separated words and each word is placed into the shared queue. Once the file has been completely read, a special null
object is placed into the queue, in order to notify the consumer that no more elements will be placed into the queue.
Consumer.java:
import java.util.HashSet; import java.util.Set; public class Consumer extends Thread { private final Set seenObjects = new HashSet(); private int total = 0; private final SharedFiFoQueue queue; public Consumer(SharedFiFoQueue queue) { this.queue = queue; } @Override public void run() { try { do { Object obj = queue.remove(); if(obj == null) break; if(!seenObjects.contains(obj)) { ++total; seenObjects.add(obj); } System.out.println("[Consumer] Read the element: " + obj.toString()); } while(true); } catch (InterruptedException ex) { System.err.println("An InterruptedException was caught: " + ex.getMessage()); ex.printStackTrace(); } System.out.println("\n[Consumer] " + total + " distinct words have been read..."); } }
The Consumer
class constantly reads elements from the shared queue, until a special null
object is received. The Consumer
class also counts the number of distinct words, as received by the producer.
ConditionExample.java:
public class ConditionExample { public static void main(String[] args) throws InterruptedException { SharedFiFoQueue sharedQueue = new SharedFiFoQueue(10); //Create a producer and a consumer. Thread producer = new Producer(sharedQueue); Thread consumer = new Consumer(sharedQueue); //Start both threads. producer.start(); consumer.start(); //Wait for both threads to terminate. producer.join(); consumer.join(); } }
In a sample main
method, we create one instance of each class and wait for both threads to terminate. A sample execution is shown below:
[Consumer] Read the element: Lorem [Consumer] Read the element: ipsum [Consumer] Read the element: dolor [Consumer] Read the element: sit ... [Consumer] Read the element: in [Consumer] Read the element: est. [Consumer] 80 distinct words have been read...
Download the Eclipse Project
This was a tutorial about the Condition
interface in Java.
You can download the full source code of this example here: ConditionExamples.zip.
Producer and Consumer constructors were showing error because of the ‘queue’ instance variable declared as ‘final’.
private final SharedFiFoQueue queue;
Use try/finally when working with locks, see example: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html