Java 8 Concurrency Tutorial
This article is about concurrency and parallel processing features in Java update 8. It is necessary to mention that concurrency is a very complicated matter and depends strongly in the hardware used for testing and benchmarking. The purpose of this article is not to compare performances using the different ways offered by Java but to list the new options that came out with the last Java version and to provide some examples of use.
All code shown in this article has been implemented using Java 8 update 11 and Eclipse Luna version 4.4. They can be downloaded at the end of this page.
java.util.concurrent new features
The package java.util.concurrent
contains several new classes and different enhancements in existing ones. We are going to mention the most important (under my point of view) and to show how to use the class CompletableFuture
.
CompletionStage
: A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes (from the Oracle official page).CompletableFuture
: Class that extends the existing interface Future
(and the interface CompletionStage
mentioned before). It supports Lambdas (functional), parallelism (all its methods can run asyncrhonously) and it is event driven. It can be completed or cancelled ad hoc. We will see a couple of examples of this class in this article.CountedCompleter
: A ForkJoinTask with a completion action performed when triggered and there are no remaining pending actions (from the API).CompletionException
: This Exception is thrown when an error occurs during the computation of a task.CompletableFuture
As mentioned before, CompletableFuture
is one of the most important enhancements in Java 8. It implements the interfaces Future
and CompletionStage
, providing several functionalities related to futures out of the box. It is much easier to handle futures and promises using Java as it was before Java 8.
In this chapter we are going to see how to initialize CompletableFutures
and how to combine then in order to achieve our goals. In comparison with Future
this one support Lambdas, it is totally asynchronous since all its methods have async versions and it is event driven, so methods are triggered, cancelled or even completed by events. Other advantage while comparing with Future
is that CompletableFuture
can be cancelled or completed ad hoc; we are going to see this afterwards.
Creation
It is possible to create a completable directly using its constructor:
CompletableFuture completableFuture = new CompletableFuture();
or using factory methods where the sync or async mode is specified and its task or process is also passed:
CompletableFuture completableFuture = CompletableFuture.supplyAsync( ( ) -> { // big computation task return "100"; } );
we can see in the snippet above the usage of Lambdas
in combination with CompletableFuture
.
Getting results
In order to get the results of a CompletableFuture
we have several options:
get()
method:System.out.println( "get " + cf.get() );
Will wait for ever until the CompletableFuture
is completed or cancelled.
getNow(String fallback)
method:System.out.println( "get now " + cf.getNow( "now" ) );
If the result of the computation is not present yet, the fallback passed as parameter is returned.
get(long amount, TimeUnit unit)
method:System.out.println( "get in 3 seconds " + cf.get( 3, TimeUnit.SECONDS ) );
It waits x time units, and afterwards tries to return the computed value if available, if not an java.util.concurrent.TimeoutException
is thrown.
Completing ad hoc
As mentioned before, one of the features of this class is the possibility to complete or cancel the future ad hoc or explicitly. We are going to see an example of completion:
We create a completable that is not going to end (it calculates, computes…for ever):
CompletableFuture completableFutureToBeCompleted2 = CompletableFuture.supplyAsync( ( ) -> { for( int i = 0; i < 10; i-- ) { System.out.println( "i " + i ); } return 10; } );
If we try to get the result of the completable shown above we are not going to get it never. We should complete or cancel it somehow. So we create a new CompletableFuture
that is going to complete the first one:
CompletableFuture completor = CompletableFuture.supplyAsync( ( ) -> { System.out.println( "completing the other" ); completableFutureToBeCompleted2.complete( 222 ); return 10; } );
Now we can retrieve the results:
System.out.println( completor.get() ); System.out.println( completableFutureToBeCompleted2.get() );
And the console will show:
... i -918 i -919 i -920 i -921 i -922 i -923 i -924 completor value 10 i -925 tobe completed value 222 i -926 i -927 i -928 i -929 i -930 i -931 i -932 ...
We can see that the value of the first one is 222, the value passed to the complete()
method.
Even if this code is not really useful, it shows how to complete one completable future ad hoc.
Joining
It is possible to join different CompletableFuture
and use its results in future calculations using the methods thenApply()
and thenCompose()
:
CompletableFuture completableFutureBigCompute = CompletableFuture.supplyAsync( ( ) -> { // big computation return "10"; } ); CompletableFuture thenCompose = completableFutureBigCompute.thenCompose(CombiningCompletableFutures::continueWithVeryImportantThing ); CompletableFuture<CompletableFuture> thenApply = completableFutureBigCompute.thenApply(CombiningCompletableFutures::continueWithSomethingElse ); System.out.println( "thenCompose " + thenCompose.get() ); System.out.println( "thenApply " + thenApply.get() ); // is already completed System.out.println( "thenApply " + thenApply.isDone() ); // is already completed CompletableFuture thenCompose2 = completableFutureBigCompute.thenCompose(CombiningCompletableFutures::continueWithVeryImportantThing ); // difference between compose and apply: thenCompose uses the value of the source System.out.println( "thenCompose2 " + thenCompose2.get() );
In the snippet above we see the usage of the two methods; they are quite similar in the sense that both use the results produced by the source completable.
Combining
It is also possible to combine two totally independent completables by using the method thenCombine()
.
Imagine that in order to provide access to our web page, users need to enter login and password and they have to be located in a specific country because the content of our web site is only allowed in some regions. Well, then we should have some validation process in order to check the login and password against our databases and a checker that checks where the user’s pc is located. In order to provide access both checkers should be successful.
We can program something like this using CompletableFuture
:
String login = "dani", password = "pass", land = "spain"; CompletableFuture loginCompletable = checkLogin( login, password ); CompletableFuture checkLandCompletable = checkLand( land ); CompletableFuture welcomeOrNot = loginCompletable.thenCombine( checkLandCompletable, ( cust, shop ) -> welcome( cust, shop ) ); String welcome( Boolean login, Boolean land ) { if( login && land ) return "welcome"; return "not welcome"; } private static CompletableFuture checkLand( String land ) { // only Spanish are allowed return CompletableFuture.supplyAsync( ( ) -> { // big task with back end dependencies return "spain".equals( land ); } ); } private static CompletableFuture checkLogin( String login, String password ) { return CompletableFuture.supplyAsync( ( ) -> { // very hard authentication process return login != null && password != null; } ); } System.out.println( welcomeOrNot.get() );
We can see how the method thenCombine
handles both checkers.
Exception handling
There are several options to handle exceptions while working with CompletableFuture
. We are going to see how to use the methods exceptionally()
and handle()
:
exceptionally()
:This method is very useful in case we want to catch exceptions produced during the computation of the completable future and provide some basic recovering mechanism:
CompletableFuture completableFutureException = CompletableFuture.supplyAsync( ( ) -> { // big task return 10 / 2; // return 10/0 // produces an exception, division by zero } ); CompletableFuture fallback = completableFutureException.exceptionally( x -> 0 ); System.out.println( fallback.get() );
The code shown above consists of two completables: the first one computes “very complicated” formulas where several exceptions may arise; the second one “waits” for the results of the computation and in case an exception happens, provides a basic fallback. In the snippet no exception is thrown during the calculation, so the result would be:
results: 5
handle()
: In case we want to provide more sophisticated handling for the returned results and the exceptions that may happen during the computation, we can use the method handle()
:
CompletableFuture completableFutureHandleOk = CompletableFuture.supplyAsync( ( ) -> { // big task return 10 / 0; // exception division by zero // return 10 / 2; } ); CompletableFuture handleOkError = completableFutureHandleOk.handle( ( ok, ex ) -> { if( ok != null ) { // return the value if everything ok return ok; } else { // in case of an exception print the stack trace and return null ex.printStackTrace(); return null; } System.out.println( "ok or error ? " + handleOkError.get() );
In this case, an exception is thrown during the computation, the result is that the stack trace is printed out and null is returned:
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.internalComplete(Unknown Source) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(Unknown Source) at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source) at java.util.concurrent.ForkJoinPool.scan(Unknown Source) at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) Caused by: java.lang.ArithmeticException: / by zero at com.danibuiza.javacodegeeks.concurrency.completablefuture.CompletableFutureFactories.lambda$5(CompletableFutureFactories.java:45) at com.danibuiza.javacodegeeks.concurrency.completablefuture.CompletableFutureFactories$$Lambda$7/2001049719.get(Unknown Source) ... 6 more ok or error ? null
All methods shown in the examples before are available both in synchronous and asynchronous versions; it is only necessary to add the suffix “Async” to the method.
We explained some examples of usage but as you can see in the API of the CompletableFuture
class, there are more than 50 methods available, so my recommendation is to visit the official Oracle documentation in order and learn more about it.
java.util.concurrent.atomic new features
Java 8 offers new possiblities to maintain atomic variables like sums, counts or accumulations. These were a common concurrency problem in Java applications. There are 4 new small classes in the package java.util.concurrent.atomic
(descriptions from the oracle API) that help to maintain common variables used by different Threads:
DoubleAccumulator
: One or more variables that together maintain a running double value updated using a supplied function.DoubleAdder
: One or more variables that together maintain an initially zero double sum.LongAccumulator
: One or more variables that together maintain a running long value updated using a supplied function.LongAdder
: One or more variables that together maintain an initially zero long sum. Compared to the “old” atomic variables, the main benefit is the performance that these ones provide. The code bellow show the main functionalities of the DoubleAccumulator
class:
//creation implementing the binary operator double identity = 1.0; DoubleBinaryOperator operator = new DoubleBinaryOperator() { @Override public double applyAsDouble( double x, double y ) { return x + y; } }; DoubleAccumulator doubleAcum = new DoubleAccumulator( operator, identity ); //equivalent using lambdas DoubleAccumulator doubleAcumLambda = new DoubleAccumulator( (x,y)->x+y, identity); //several get methods, with castings System.out.println( "get " + doubleAcum.get() ); System.out.println( "byteValue " + doubleAcum.byteValue() ); System.out.println( "doubleValue " + doubleAcum.doubleValue() ); System.out.println( "floatValue " + doubleAcum.floatValue() ); System.out.println( "intValue " + doubleAcum.intValue() ); //reset to identity after get System.out.println( "get then reset " + doubleAcum.getThenReset() ); //accumulates the passed value doubleAcum.accumulate( 80.9 ); //resets again doubleAcum.reset();
The code above shows how to use the class DoubleAccumulator
and its main methods. Nothing very interesting for the moment. The important thing here is the fact that this class can acummulate a double (or a long for the long counterpart) that can be used by many threads in parallel, so we do not need to take care of concurrency or writes and reads access or locks.
Class LongAdder
(and DoubleAdder
) is a specialization of the accumulator ones. Same behaviour can be achieved by creating an accumulator using the following expression:
DoubleAccumulator doubleAcumLambda = new DoubleAccumulator( (x,y)->x+y, 0.0);
In the code attached at the end of this article you can find many more examples of the usage of DoubleAccumulator
and LongAdder
in concurrent environments.
Streams and ParallelStreams
Streams
are also affected by concurrency in Java 8; they offer parallelism via the parallel()
and parallelStream()
methods. Also Arrays
takes advantages of new Fork/Join framework and provides concurrent methods for sorting, searching, mapping and others.
As an example we can have a look to the results produced by the parallel and not parallel methods for sorting arrays:
Double[] unsorted = new Double[10000000]; for( int i = 0; i < unsorted.length; i++ ) { unsorted[i] = random( i, unsorted.length ); } long start = System.currentTimeMillis(); Arrays.sort( unsorted ); long end = System.currentTimeMillis(); System.out.println( "time: " + ( end - start ) + " ms." ); start = System.currentTimeMillis(); Arrays.parallelSort( unsorted ); end = System.currentTimeMillis(); System.out.println( "time: " + ( end - start ) + " ms." );
The results are:
time: 6639 ms. time: 679 ms.
So, the parallel processing is much better in this case. But as we said at the beginning of this article, concurrency is a very complicated matter and cannot be analyzed without a larger set of data and benchmarking. This is just an example of usage of one of the new methods available that support concurrency in Java 8.
Summary
So, that’s it. We covered many parts of the new concurrency and parallel processing options that Java 8 offers. We saw examples of CompletableFuture
, atomic DoubleAccumulator
and how Streams
can be used to work in parallel by making usage of the Java Fork Join Framework
.
Links
In this article we saw several features and improvements for handling concurrency and parallelism in Java 8, but there are others that are not covered. In order to get more information about them and other Java 8 novelties, please visit the following links:
Download the code
All examples from this article (and some more) can be downloaded in the following link: concurrencyJava8.
CompletableFuture have non-blocking call and Future.get() is blocking call. CallBacks in CompletableFuture will also block the next action if output of first callback is input for the second callback. Please suugest
The sorting example is not done correctly, the time changes in the second sort because the array is already sorted, it will depend on the parallelism of your computer but to demonstrate the difference the array must be re-initialized with random values for the second example.