Java CompletionService Example
1. Introduction to CompletionService
The Java JDK provides some “services” for common use cases. The asynchronous Producer-Consumer pattern is one such common use case which is often required in day to day software development. As such the JDK provides platform level support in the form of APIs like java.util.concurrent.CompletionService
and java.util.concurrent.ExecutorCompletionService
.
CompletionService
abstracts the service of handling tasks which could be submitted to it by Producers (threads); next this Service relies on an underlying service such as Executor
(Service) to process the submitted tasks; once processing is completed, depending on its successful completion, the CompletionService
would take up the responsibility of queueing the end results. As results are being enqueued, CompletionService
also takes up the responsibility of tendering the end results to Consumer (Threads), concurrently or blocking them as necessary!
Notice that in all of this, and by the virtue of this service, the act of creation of tasks by Producers is decoupled from Consumers who would want to consume the end results of those completed tasks. More formally, this is what the javadoc of CompletionService
says:
A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. Producers submit tasks for execution. Consumers take completed tasks and process their results in the order they complete.
CompletionService
is the interface and ExecutorCompletionService
is its concrete implementation. We would be looking these APIs in this post and would also consider subtlety entailing these APIs, and design choices when we seemingly have competing APIs like ExecutorService
from the same package!
2. The CompletionService Programming Model And API
It is quite easy to work with CompletionService
; the ExecutorCompletionService
, as the name might hint, provides Constructors which do accept an Executor
instance. And all we have to do is to recognize our Producers and Consumers separately; let Producers produce tasks and submit them concurrently and/or parallelly to the ExecutorCompletionService
.
These tasks would be assigned to Executor
for their processing. As each task is concurrent, their completion order cannot be guaranteed. Whenever any task is completed, its outcome is queued in a BlockingQue
(in fact, there is yet another Constructor variant with ExecutorCompletionService via which even custom BlockingQue instance can be provided) wrapped in a Future instance. Thereafter concurrent Consumers can take()
or poll()
results from this queue, blocking as necessary. Again, the stress should be given to the fact that Producers producing tasks and Consumers consuming the results have no interconnections; they are totally decoupled and concurrent activities in their own right. Quite obviously, nevertheless, that if there is no enqueued result, the Consumer might want to wait (block)!
3. CompletionService APIs for Producers!
CompletionService exposes variants of submit() method which takes a Callable and Runnable
instances respectively; both these types represent the tasks to be executed. The signature is as follows:
Future submit(Callable task) Future submit(Runnable task, V result)
And this is all the of the API for the Producers. And using these APIs are just a piece of cake: just wrap your task inside a Callable instance and submit()
it to the CompletionService:
CompletionService API for Task Producers: Passing Callable Tasks
private ExecutorService exec = Executors.newFixedThreadPool(5); private CompletionService service = new ExecutorCompletionService(exec); service.submit(()-> return 100D);
A few words are in order: We initialized an ExecutorService with a fixed sized thread pool. This is the pool which CompletionService would be using to execute the completed tasks. Notice the pattern of one service (CompletionService) depending on another service (ExecutorService). After this, we instantiated an ExecutorCompletionService passing to it the reference to the ExecutorService. This is when our CompletionService is ready to accept tasks. Therefore, we call submit() method on CompletionService and passed a Lambda-fied Callable instance, – our Callable
task does not do anything worthwhile, for demonstration purposes, it simply returns a hard-coded double value.
CompletionService API for Task Producers: Passing Runnable Tasks
private ExecutorService exec = Executors.newFixedThreadPool(5); private CompletionService service = new ExecutorCompletionService(exec); service.submit(()-> { doSomeProcessing(); }, 100D);
As we said earlier that we may even pass a Runnable representing the task, and that is what we are doing in the above code snippet. We have passed a Lambda-fied Runnable instance which does some processing (and does not return any result). If the Runnable task completes successfully then 100D would be wrapped in a Future instance and queued up with CompletionService
for Consumers to consume.
4. CompletionService APIs for Consumers!
There exists similar convenient APIs for Consumer threads to take() or poll()
successfully completed tasks from task queues. The relevant APIs are:
CompletionService Consumer API
Future take() throws InterruptedException Future poll() Future poll(long timeout, TimeUnit unit) throws InterruptedException
The take()
method is necessarily a blocking call such that if there isn’t any completed task present with the CompletionService queue then this method would block until one is present! During the blocking phase, the thread can be interrupted such that the method throws InterruptedException.
On the other hand the poll() method does not block. It returns back from the completed task queue empty-handed if nothing is present at the time of calling this method. Nevertheless, the overloaded poll(long timeout, TimeUnit unit) does waits/blocks for the stipulated time to retrieve a completed task if none was present at the time of calling this method; however, if even after this wait period if no result becomes available, then this method would return null. Note that during waiting interval, poll(long timeout, TimeUnit unit)
might throw an InterruptedException
implying that some other thread might possibly request interrupt on this thread while it is waiting for the stipulated period.
5. The Canteen Service
And that is it! CompletionService has only 5 methods for Producer-Consumer pattern of tasks, and we have briefly seen them in the passages went by. Now we would make use of these APIs in a working example. We are going to model the self-serviced, canteen system of a typical college.
lets say its lunch time and the canteen staff is preparing food plates for the canteen counter. It is the canteen counter from where students would pick their food plates with food. Realize that the canteen staff are Producers, the students are Consumers and the canteen counter is the queue. Also realize that it takes time to prepare food plates and to place them at the counter; similarly, it might also happen that (i) there might be no student standing at the queue while the food plates are queued up or (ii) students have already queued up waiting for the food plates to be panned out at the counter! We will try to look at these scenarios.
Firstly we have the FoodPlate class modelling a typical campus food plate with all sorts of junk in it:
FoodPlate.java
package canteenservice.foodplate; public class FoodPlate { private boolean isPizzaReady; private boolean isBurgerReady; private boolean isOtherJunkReady; private String foodPlateCreatedBy; public String getFoodPlateCreatedBy() { return foodPlateCreatedBy; } public void setFoodPlateCreatedBy(String foodPlateCreatedBy) { this.foodPlateCreatedBy = foodPlateCreatedBy; } public boolean isPizzaReady() { return isPizzaReady; } public void setPizzaReady(boolean isPizzaReady) { this.isPizzaReady = isPizzaReady; } public boolean isBurgerReady() { return isBurgerReady; } public void setBurgerReady(boolean isBurgerReady) { this.isBurgerReady = isBurgerReady; } public boolean isOtherJunkReady() { return isOtherJunkReady; } public void setOtherJunkReady(boolean isOtherJunkReady) { this.isOtherJunkReady = isOtherJunkReady; } }
Following this we have the class to model CanteenStaff and Students, respectively:
CanteenStaffProducer.java
package canteenservice.producer; import java.util.Date; import java.util.concurrent.Callable; import canteenservice.foodplate.FoodPlate; public class CanteenStaffProducer implements Callable { private String staffName; public CanteenStaffProducer(String prodName) { this.staffName = prodName; } @Override public FoodPlate call() throws Exception { //Simulate time required to prepare food plates using Thread sleeps System.out.println("Current Canteen Staff at work: "+ this.staffName + " at "+ new Date()); Thread.sleep(2000L); FoodPlate foodPlate = new FoodPlate(); foodPlate.setBurgerReady(true); foodPlate.setPizzaReady(true); foodPlate.setOtherJunkReady(true); foodPlate.setFoodPlateCreatedBy(this.staffName); return foodPlate; } }
StudentConsumer.java
package canteenservice.consumer; import java.util.Date; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import canteenservice.foodplate.FoodPlate; public class StudentConsumer implements Runnable { private String studName; private CompletionService service; public String getStudName() { return studName; } public void setStudName(String studName) { this.studName = studName; } public CompletionService getService() { return service; } public void setService(CompletionService service) { this.service = service; } public StudentConsumer(String studName, CompletionService service) { this.studName = studName; this.service = service; } @Override public void run() { System.out.println("Student waiting for foodplate: "+ this.studName + " at "+ new Date()); try { Future fp = service.take(); System.out.println("student got food plate created by: "+ fp.get().getFoodPlateCreatedBy()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println("Exiting run()"); } }
We also have a helper class to initialize and tender ExecutorCompletionService instance:
CompletionServiceProvider.java
package canteenservice.completionservice; import java.util.concurrent.CompletionService; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; public class CompletionServiceProvider { private static final Executor exec = Executors.newCachedThreadPool(); private static final CompletionService completionService = new ExecutorCompletionService(exec); public static Executor getExec() { return exec; } public static CompletionService getCompletionservice() { return completionService; } }
And finally, the main application class:
CanteenService.java
package canteenservice; import java.util.concurrent.CompletionService; import canteenservice.completionservice.CompletionServiceProvider; import canteenservice.consumer.StudentConsumer; import canteenservice.producer.CanteenStaffProducer; public class CanteenService { public static void main(String[] args) throws Exception{ /* * Scenario1: Canteen Staff (Producers) preparing food plates * and no students yet at counter */ //Create a few Canteen Staffs as producers. CanteenStaffProducer prod1 = new CanteenStaffProducer("staff1"); CanteenStaffProducer prod2 = new CanteenStaffProducer("staff2"); //submit tasks of food plate creation to the CompletionService CompletionService compService = CompletionServiceProvider.getCompletionservice(); //compService.submit(prod1); //compService.submit(prod2); // Scenario2: Students (Consumers) at the canteen counter // but no food plates yet available. // Remember to comment out the two submit calls from above // to simulate this situation. Note that the following // thread would block since we have used CompletionService.take // If you need an unblocking retrieval of completed tasks // (retrieval of food plates), use poll method. new Thread(new StudentConsumer("student1",compService)).start(); new Thread(new StudentConsumer("student2",compService)).start(); // Scenario3: For random Producers and Consumers, please uncomment submit() method calls. } }
6. Conclusion
CompletionService is a service which the JDK provides out of the box to implement common asynchronous Producer-Consumer tasks.
7. Download the Eclipse Project
This was an example demonstrating the CompletionService API from the JDK to asynchronously handle Producer-Consumer tasks.
You can download the full source code of this example here: JavaCompletionServiceExample