concurrent

Java 8 Concurrency Tutorial

This article is about concurrency and parallel processing features in Java update 8. It is necessary to mention that concurrency is a very complicated matter and depends strongly in the hardware used for testing and benchmarking. The purpose of this article is not to compare performances using the different ways offered by Java but to list the new options that came out with the last Java version and to provide some examples of use.

All code shown in this article has been implemented using Java 8 update 11 and Eclipse Luna version 4.4. They can be downloaded at the end of this page.
 
 
 

java.util.concurrent new features

The package java.util.concurrent contains several new classes and different enhancements in existing ones. We are going to mention the most important (under my point of view) and to show how to use the class CompletableFuture.

  • CompletionStage: A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes (from the Oracle official page).
  • CompletableFuture: Class that extends the existing interface Future (and the interface CompletionStage mentioned before). It supports Lambdas (functional), parallelism (all its methods can run asyncrhonously) and it is event driven. It can be completed or cancelled ad hoc. We will see a couple of examples of this class in this article.
  • Class CountedCompleter: A ForkJoinTask with a completion action performed when triggered and there are no remaining pending actions (from the API).
  • Class CompletionException: This Exception is thrown when an error occurs during the computation of a task.
  • CompletableFuture

    As mentioned before, CompletableFuture is one of the most important enhancements in Java 8. It implements the interfaces Future and CompletionStage, providing several functionalities related to futures out of the box. It is much easier to handle futures and promises using Java as it was before Java 8.

    In this chapter we are going to see how to initialize CompletableFutures and how to combine then in order to achieve our goals. In comparison with Future this one support Lambdas, it is totally asynchronous since all its methods have async versions and it is event driven, so methods are triggered, cancelled or even completed by events. Other advantage while comparing with Future is that CompletableFuture can be cancelled or completed ad hoc; we are going to see this afterwards.

    Creation

    It is possible to create a completable directly using its constructor:

    				CompletableFuture completableFuture = new CompletableFuture();
    			

    or using factory methods where the sync or async mode is specified and its task or process is also passed:

    				CompletableFuture completableFuture = CompletableFuture.supplyAsync( ( ) -> {
    					// big computation task
    					return "100";
    				} );
    			

    we can see in the snippet above the usage of Lambdas in combination with CompletableFuture.

    Getting results

    In order to get the results of a CompletableFuture we have several options:

  • Using the get() method:
  • 					System.out.println( "get  " + cf.get() );
    				

    Will wait for ever until the CompletableFuture is completed or cancelled.

  • Using the getNow(String fallback) method:
  • 					System.out.println( "get now " + cf.getNow( "now" ) );
    				

    If the result of the computation is not present yet, the fallback passed as parameter is returned.

  • Using the get(long amount, TimeUnit unit) method:
  • 					System.out.println( "get in 3 seconds " + cf.get( 3, TimeUnit.SECONDS ) );
    				

    It waits x time units, and afterwards tries to return the computed value if available, if not an java.util.concurrent.TimeoutException is thrown.

    Completing ad hoc

    As mentioned before, one of the features of this class is the possibility to complete or cancel the future ad hoc or explicitly. We are going to see an example of completion:

    We create a completable that is not going to end (it calculates, computes…for ever):

    				CompletableFuture completableFutureToBeCompleted2 = CompletableFuture.supplyAsync( ( ) -> {
    				for( int i = 0; i < 10; i-- )
    				{
    					System.out.println( "i " + i );
    				}
    				return 10;
    				} );
    			

    If we try to get the result of the completable shown above we are not going to get it never. We should complete or cancel it somehow. So we create a new CompletableFuture that is going to complete the first one:

    				CompletableFuture completor = CompletableFuture.supplyAsync( ( ) -> {
    					System.out.println( "completing the other" );
    					completableFutureToBeCompleted2.complete( 222 );
    					return 10;
    				} );
    			

    Now we can retrieve the results:

    				 System.out.println( completor.get() );
    				 System.out.println( completableFutureToBeCompleted2.get() );
    			

    And the console will show:

    				...
    				i -918
    				i -919
    				i -920
    				i -921
    				i -922
    				i -923
    				i -924
    				completor value 10
    				i -925
    				tobe completed value 222
    				i -926
    				i -927
    				i -928
    				i -929
    				i -930
    				i -931
    				i -932
    				...
    			

    We can see that the value of the first one is 222, the value passed to the complete() method.

    Even if this code is not really useful, it shows how to complete one completable future ad hoc.

    Joining

    It is possible to join different CompletableFuture and use its results in future calculations using the methods thenApply() and thenCompose():

    				CompletableFuture completableFutureBigCompute = CompletableFuture.supplyAsync( ( ) -> {
    				// big computation
    					return "10";
    				} );
    
    				CompletableFuture thenCompose = completableFutureBigCompute.thenCompose(CombiningCompletableFutures::continueWithVeryImportantThing );
    
    				CompletableFuture<CompletableFuture> thenApply = completableFutureBigCompute.thenApply(CombiningCompletableFutures::continueWithSomethingElse );
    
    				System.out.println( "thenCompose " + thenCompose.get() );
    				System.out.println( "thenApply " + thenApply.get() ); // is already completed
    				System.out.println( "thenApply " + thenApply.isDone() ); // is already completed
    
    				CompletableFuture thenCompose2 = completableFutureBigCompute.thenCompose(CombiningCompletableFutures::continueWithVeryImportantThing );
    
    				// difference between compose and apply: thenCompose uses the value of the source
    				System.out.println( "thenCompose2 " + thenCompose2.get() ); 
    			

    In the snippet above we see the usage of the two methods; they are quite similar in the sense that both use the results produced by the source completable.

    Combining

    It is also possible to combine two totally independent completables by using the method thenCombine().

    Imagine that in order to provide access to our web page, users need to enter login and password and they have to be located in a specific country because the content of our web site is only allowed in some regions. Well, then we should have some validation process in order to check the login and password against our databases and a checker that checks where the user’s pc is located. In order to provide access both checkers should be successful.

    We can program something like this using CompletableFuture:

    				String login = "dani", password = "pass", land = "spain";
    				CompletableFuture loginCompletable = checkLogin( login, password );
    				CompletableFuture checkLandCompletable = checkLand( land );
    				CompletableFuture welcomeOrNot = loginCompletable.thenCombine( checkLandCompletable,
                                                                                   ( cust, shop ) -> welcome( cust, shop ) );
    																			   
    				String welcome( Boolean login, Boolean land )
    				{
    					if( login && land )	return "welcome";
    					return "not welcome";
    				}
    				
    				private static CompletableFuture checkLand( String land )
    				{
    					// only Spanish are allowed
    					return CompletableFuture.supplyAsync( ( ) -> {
    						// big task with back end dependencies
    							return "spain".equals( land );
    						} );
    				}
    
    				private static CompletableFuture checkLogin( String login, String password )
    				{
    					return CompletableFuture.supplyAsync( ( ) -> {
    						// very hard authentication process
    							return login != null && password != null;
    						} );
    				}
    
    System.out.println( welcomeOrNot.get() );
    
    			

    We can see how the method thenCombine handles both checkers.

    Exception handling

    There are several options to handle exceptions while working with CompletableFuture. We are going to see how to use the methods exceptionally() and handle():

  • exceptionally():
  • This method is very useful in case we want to catch exceptions produced during the computation of the completable future and provide some basic recovering mechanism:

    				CompletableFuture completableFutureException = CompletableFuture.supplyAsync( ( ) -> {
    					// big task
    					return 10 / 2;
    					// return 10/0 // produces an exception, division by zero
    				} );
    
    				CompletableFuture fallback = completableFutureException.exceptionally( x -> 0 );
    				System.out.println( fallback.get() );
    			

    The code shown above consists of two completables: the first one computes “very complicated” formulas where several exceptions may arise; the second one “waits” for the results of the computation and in case an exception happens, provides a basic fallback. In the snippet no exception is thrown during the calculation, so the result would be:

    				results: 5
    			
  • handle():
  • In case we want to provide more sophisticated handling for the returned results and the exceptions that may happen during the computation, we can use the method handle():

    				CompletableFuture completableFutureHandleOk = CompletableFuture.supplyAsync( ( ) -> {
    				// big task
    					return 10 / 0; // exception division by zero
    					// return 10 / 2;
    				} );
    
    				CompletableFuture handleOkError = completableFutureHandleOk.handle( ( ok, ex ) -> {
    				if( ok != null )
    				{
    					// return the value if everything ok
    					return ok;
    				}
    				else
    				{
    					// in case of an exception print the stack trace and return null
    					ex.printStackTrace();
    					return null;
    				}
    				
    				System.out.println( "ok or error ? " + handleOkError.get() );
    			

    In this case, an exception is thrown during the computation, the result is that the stack trace is printed out and null is returned:

    				java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    					at java.util.concurrent.CompletableFuture.internalComplete(Unknown Source)
    					at java.util.concurrent.CompletableFuture$AsyncSupply.exec(Unknown Source)
    					at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    					at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
    					at java.util.concurrent.ForkJoinPool.scan(Unknown Source)
    					at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
    					at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
    				Caused by: java.lang.ArithmeticException: / by zero
    					at com.danibuiza.javacodegeeks.concurrency.completablefuture.CompletableFutureFactories.lambda$5(CompletableFutureFactories.java:45)
    					at com.danibuiza.javacodegeeks.concurrency.completablefuture.CompletableFutureFactories$$Lambda$7/2001049719.get(Unknown Source)
    					... 6 more
    				ok or error ? null
    			

    All methods shown in the examples before are available both in synchronous and asynchronous versions; it is only necessary to add the suffix “Async” to the method.

    We explained some examples of usage but as you can see in the API of the CompletableFuture class, there are more than 50 methods available, so my recommendation is to visit the official Oracle documentation in order and learn more about it.

    java.util.concurrent.atomic new features

    Java 8 offers new possiblities to maintain atomic variables like sums, counts or accumulations. These were a common concurrency problem in Java applications. There are 4 new small classes in the package java.util.concurrent.atomic (descriptions from the oracle API) that help to maintain common variables used by different Threads:

  • DoubleAccumulator: One or more variables that together maintain a running double value updated using a supplied function.
  • DoubleAdder: One or more variables that together maintain an initially zero double sum.
  • LongAccumulator: One or more variables that together maintain a running long value updated using a supplied function.
  • LongAdder: One or more variables that together maintain an initially zero long sum.
  • Compared to the “old” atomic variables, the main benefit is the performance that these ones provide. The code bellow show the main functionalities of the DoubleAccumulator class:

    		 //creation implementing the binary operator
    			double identity = 1.0;
    			DoubleBinaryOperator operator = new DoubleBinaryOperator()
    			{
    
    				@Override
    				public double applyAsDouble( double x, double y )
    				{
    					return x + y;
    				}
    			};
    			DoubleAccumulator doubleAcum = new DoubleAccumulator( operator, identity );
    			
    			//equivalent using lambdas
    			DoubleAccumulator doubleAcumLambda = new DoubleAccumulator( (x,y)->x+y, identity);
    		
    			//several get methods, with castings
    			System.out.println( "get " + doubleAcum.get() );
    			System.out.println( "byteValue " + doubleAcum.byteValue() );
    			System.out.println( "doubleValue " + doubleAcum.doubleValue() );
    			System.out.println( "floatValue " + doubleAcum.floatValue() );
    			System.out.println( "intValue " + doubleAcum.intValue() );
    
    			//reset to identity after get
    			System.out.println( "get then reset " + doubleAcum.getThenReset() );
    			
    			//accumulates the passed value
    			doubleAcum.accumulate( 80.9 );
    			
    			//resets again
    			doubleAcum.reset();
    		

    The code above shows how to use the class DoubleAccumulator and its main methods. Nothing very interesting for the moment. The important thing here is the fact that this class can acummulate a double (or a long for the long counterpart) that can be used by many threads in parallel, so we do not need to take care of concurrency or writes and reads access or locks.

    Class LongAdder (and DoubleAdder) is a specialization of the accumulator ones. Same behaviour can be achieved by creating an accumulator using the following expression:

    			DoubleAccumulator doubleAcumLambda = new DoubleAccumulator( (x,y)->x+y, 0.0);
    		

    In the code attached at the end of this article you can find many more examples of the usage of DoubleAccumulator and LongAdder in concurrent environments.

    Streams and ParallelStreams

    Streams are also affected by concurrency in Java 8; they offer parallelism via the parallel() and parallelStream() methods. Also Arrays takes advantages of new Fork/Join framework and provides concurrent methods for sorting, searching, mapping and others.

    As an example we can have a look to the results produced by the parallel and not parallel methods for sorting arrays:

    		Double[] unsorted = new Double[10000000];
            for( int i = 0; i < unsorted.length; i++ )
            {
                unsorted[i] = random( i, unsorted.length );
            }
    
            long start = System.currentTimeMillis();
            Arrays.sort( unsorted );
            long end = System.currentTimeMillis();
    
            System.out.println( "time: " + ( end - start ) + " ms." );
    
            start = System.currentTimeMillis();
            Arrays.parallelSort( unsorted );
            end = System.currentTimeMillis();
    
            System.out.println( "time: " + ( end - start ) + " ms." );
    	

    The results are:

    		time: 6639 ms.
    		time: 679 ms.
    	

    So, the parallel processing is much better in this case. But as we said at the beginning of this article, concurrency is a very complicated matter and cannot be analyzed without a larger set of data and benchmarking. This is just an example of usage of one of the new methods available that support concurrency in Java 8.

    Summary

    So, that’s it. We covered many parts of the new concurrency and parallel processing options that Java 8 offers. We saw examples of CompletableFuture, atomic DoubleAccumulator and how Streams can be used to work in parallel by making usage of the Java Fork Join Framework.

    Links

    In this article we saw several features and improvements for handling concurrency and parallelism in Java 8, but there are others that are not covered. In order to get more information about them and other Java 8 novelties, please visit the following links:

  • http://docs.oracle.com/javase/8/docs/technotes/guides/concurrency/changes8.html
  • http://www.javacodegeeks.com/2013/05/java-8-definitive-guide-to-completablefuture.html
  • http://www.javacodegeeks.com/2013/05/java-8-completablefuture-in-action.html
  • http://www.javacodegeeks.com/2014/05/java-8-features-tutorial.html
  • Download the code

    Download
    All examples from this article (and some more) can be downloaded in the following link: concurrencyJava8.

    Dani Buiza

    Daniel Gutierrez Diez holds a Master in Computer Science Engineering from the University of Oviedo (Spain) and a Post Grade as Specialist in Foreign Trade from the UNED (Spain). Daniel has been working for different clients and companies in several Java projects as programmer, designer, trainer, consultant and technical lead.
    Subscribe
    Notify of
    guest

    This site uses Akismet to reduce spam. Learn how your comment data is processed.

    2 Comments
    Oldest
    Newest Most Voted
    Inline Feedbacks
    View all comments
    DEEPAK PANDEY
    DEEPAK PANDEY
    5 years ago

    CompletableFuture have non-blocking call and Future.get() is blocking call. CallBacks in CompletableFuture will also block the next action if output of first callback is input for the second callback. Please suugest

    Dia
    Dia
    1 year ago

    The sorting example is not done correctly, the time changes in the second sort because the array is already sorted, it will depend on the parallelism of your computer but to demonstrate the difference the array must be re-initialized with random values for the second example.

    Back to top button