Reactive Java (RxJava) Tutorial: Advanced
Reactive Java or RxJava is an implementation and enhancement of the observer pattern. It was intended for use in event driven schemes where nesting synchronous or asynchronous callback methods becomes overly complex. It abstracts some of the more complex tasks associated with asynchronous operations including threading and concurrency.
This example builds on the basics described in Part One of this topic. If you are not familiar with the basics, please look back at this previous article before moving forward with these more advanced concepts. This article will examine asynchronous or concurrency tasks that are made easier with RxJava. The concurrency problems that RxJava is suitable to solve and we will look at in this example include:
- Nested callbacks
- Making asynchronous calls
- Aggregating or combining asynchronous calls
- Streaming
You can also check this tutorial in the following video:
1. Setup
We used Eclipse Neon, Java 8, Maven 3.3.9 and RxJava 2.0.0. At the time of this sample, we ran into issues using the last version, 2.0.7, with Eclipse as the source was not available. This example uses a simple Java application to demonstrate Reactive functionality.
pom.xml
1
2
3
4
5
|
< dependency > < groupId >io.reactivex.rxjava2</ groupId > < artifactId >rxjava</ artifactId > < version >2.0.0</ version > </ dependency > |
2. Simple Asynchronous Call
Let’s start by exploring the asynchronous capabilities in RxJava. In the next few examples we will use RxJava to spawn new threads to do various tasks. The default behavior of an Observable
is to observe on the same thread where the subscribe method is called. You can introduce an asynchronous process using the subscribeOn
method. Here we will look at a simple asynchronous call in RxJava.
Notice that in this example we are subscribing on the Schedulers.newThread()
scheduler and using lambda notation to execute the serialize method of the FlowableEmitter
interface. The FlowableEmitter
interface requires that we call the onNext
, onError
and onComplete
methods sequentially. In this example, we will simplify things and use lambda notation to call the serialize
method which will ensure that these methods are serialized. The body of this method will act as the worker thread and will sleep for three seconds while the process moves ahead. You can see from the output that the flowable messages, “RxJavaExampleAdvanced:51 – Executing async flowable…”, are logged after the main thread finishes “RxJavaExampleAdvanced:59 – Finished simple async”.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
public static void simpleAsync() { logger.info( "Starting simple async" ); Flowable.create((FlowableEmitter s) -> { try { logger.info( "Executing async flowable." ); Thread.sleep( 3000 ); logger.info( "Finished async flowable." ); } catch (Exception e) { } s.onComplete(); }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread()).subscribe(); logger.info( "Finished simple async" ); } |
Output
1
2
3
4
|
2017-07-16 10:35:03 INFO RxJavaExampleAdvanced:47 - Starting simple async 2017-07-16 10:35:03 INFO RxJavaExampleAdvanced:59 - Finished simple async 2017-07-16 10:35:03 INFO RxJavaExampleAdvanced:51 - Executing async flowable. 2017-07-16 10:35:06 INFO RxJavaExampleAdvanced:53 - Finished async flowable. |
3. Asynchronous Web Service Call
A common usage of RxJava is to make long running calls or calls with unpredictable finish times asynchronously. This allows the code to handle other tasks while it is waiting for the long running call to finish. You might see a client user interface make a web service call for data asynchronously so it can finish displaying the components to the user that are not dependent on the data in the service call. In the next example, we will explore using RxJava to make asynchronous calls to a web service. We make one simple call to a web service or API that returns a String result. You’ll notice that the API is executed on a new thread “RxNewThreadScheduler-1” not the main thread. Again we use the FlowableEmitter
interface with lambda notation to execute the serialize
method which makes a rest call to our API on a new thread.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public static void simpleAsyncAPICalls() { logger.info( "Starting async api" ); logger.info( "Main Thread: {}" , Thread.currentThread().getName()); Flowable.create((FlowableEmitter s) -> { try { logger.info( "Emitted thread: {}" , Thread.currentThread().getName()); logger.info( "Result: {}" , result); s.onNext(result); } catch (Exception e) { s.onError(e); } s.onComplete(); }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread()).subscribe(logger::info); logger.info( "Ending async api" ); } private static String makeCallString(String URI) { RestTemplate restTemplate = new RestTemplate(); String result = restTemplate.getForObject(URI, String. class ); return result; } |
Output
1
2
3
4
5
6
|
2017-07-29 10:49:25 INFO RxJavaExampleAdvanced:63 - Starting async api 2017-07-29 10:49:25 INFO RxJavaExampleAdvanced:64 - Main Thread: main 2017-07-29 10:49:26 INFO RxJavaExampleAdvanced:77 - Ending async api 2017-07-29 10:49:26 INFO RxJavaExampleAdvanced:68 - Emitted thread: RxNewThreadScheduler-1 2017-07-29 10:49:26 INFO RxJavaExampleAdvanced:69 - Result: Hello Stream! 2017-07-29 10:49:26 INFO RxJavaExampleAdvanced:? - Hello Stream! |
4. Multiple Asynchronous Web Service Calls
Many times you’ll need to make multiple calls to a web service. In this next example we’ll leverage the map function of RxJava to execute and return the response from multiple API calls. As a reminder, the map function returns the data type of the Flowable
while flatMap
will return the Flowable
object. Using the Flowable
that is returned from a flatMap call would allow you to take further reactive actions on the response. You’ll see this in later examples but in this case we are just outputing the result and will not need to make use of the Flowable
.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
public static void multipleAsyncAPICalls() { logger.info( "Starting multi async api" ); logger.info( "Main Thread: {}" , Thread.currentThread().getName()); Flowable.fromArray( "http://localhost:8080/jcg/service/stream/no" , "http://localhost:8080/jcg/service/stream/no" , .map( new Function() { int resultCount = 0 ; @Override public String apply(String t) throws Exception { String result = makeCallString(t); logger.info( "Emitted thread: {}" , Thread.currentThread().getName()); logger.info( "Result {}: {}" , resultCount++, result); return result + " on " + Thread.currentThread().getName(); } }).subscribeOn(Schedulers.newThread()).subscribe(logger::info); logger.info( "Ending multi async api" ); } |
Output
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:123 - Starting multi async api 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:124 - Main Thread: main 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:137 - Ending multi async api 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-1 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:133 - Result 0: Hello Stream! 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-1 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-1 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:133 - Result 1: Hello Stream! 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-1 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-1 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:133 - Result 2: Hello Stream! 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-1 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-1 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:133 - Result 3: Hello Stream! 2017-07-16 10:46:12 INFO RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-1 |
While this code will operate the emitted calls on another thread it does not create a new thread for each item. If we want to have our process execute on a new thread for each item we will need to do something slightly different. As a developer leveraging these reactive mechanisms you need to think carefully about which calls you want to make synchronously versus those that you want to make asynchronously. If you need to use the results of earlier calls in later calls, you will want each to execute synchronously. If you just need all the data returned and you are not concerned with the order in which it is returned, you might have a good candidate to make each call asynchronously which will improve the overall performance.
We will start with a simple asynchronous example where multiple items are emitted and build on that to use the same technique for multiple API calls. There’s a lot going on in this small sample. We are using the flatMap
method to operate on an array of items and transform the array to another Observable for each item. We’ll need to do that to ensure that we can subscribeOn
each item which will execute those corresponding operations on a different thread. This is how we ensure that each emitted item is handled asychronously rather than the group sequentially executed on a separate thread. Notice that we have a different thread for both of the items in this example; "Thread[RxNewThreadScheduler-1,5,main]"
, "Thread[RxNewThreadScheduler-2,5,main]"
.
01
02
03
04
05
06
07
08
09
10
|
public static void simpleAsyncMulti() { logger.info( "Starting multi async" ); Observable.just( 1 , 2 ) .flatMap(item -> Observable.just(item.toString()).subscribeOn(Schedulers.newThread()) .doOnNext(i -> logger.info(Thread.currentThread().toString()))) .subscribe(logger::info); logger.info( "Ending multi async" ); } |
Output
1
2
3
4
5
6
|
2017-07-16 10:36:49 INFO RxJavaExampleAdvanced:63 - Starting multi async 2017-07-16 10:36:49 INFO RxJavaExampleAdvanced:70 - Ending multi async 2017-07-16 10:36:49 INFO RxJavaExampleAdvanced:67 - Thread[RxNewThreadScheduler-1,5,main] 2017-07-16 10:36:49 INFO RxJavaExampleAdvanced:67 - Thread[RxNewThreadScheduler-2,5,main] 2017-07-16 10:36:49 INFO RxJavaExampleAdvanced:? - 1 2017-07-16 10:36:49 INFO RxJavaExampleAdvanced:? - 2 |
Now lets take a look at an asynchronous example where we spawn a new thread for each API call. Again we map each emitted value to a new Observable
with a single item and subscribeOn
to a new thread. Again, you can see in this sample that the thread id that is executing each request is different, i.e. Thread:Thread[RxNewThreadScheduler-4,5,main]
.
1
2
3
4
5
6
7
|
Observable .flatMap(item -> Observable.just(item).subscribeOn(Schedulers.newThread()).doOnNext(i -> { logger.info(makeCallString(i)); logger.info(Thread.currentThread().toString()); })).subscribe(System.out::println); |
Output
01
02
03
04
05
06
07
08
09
10
11
12
|
2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Hello Stream! 2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-4,5,main] http: //localhost :8080 /jcg/service/stream/no 2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Hello Stream! 2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-3,5,main] http: //localhost :8080 /jcg/service/stream/no 2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Hello Stream! 2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Hello Stream! 2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-2,5,main] http: //localhost :8080 /jcg/service/stream/no 2017-07-04 08:57:22 INFO RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-1,5,main] http: //localhost :8080 /jcg/service/stream/no |
5. Combine Asynchronous Results
To build on this technique, we will make multiple API calls and zip or combine the results. We are making multiple API calls asychronously, each on its own thread, and using the zip method to combine the results.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public static void flatMapZipAsyncAPICalls() { Flowable result = Flowable.create((FlowableEmitter s) -> { try { logger.info( "Emitted thread: {}" , Thread.currentThread().getName()); logger.info( "Result: {}" , r); s.onNext(r); } catch (Exception e) { s.onError(e); } s.onComplete(); }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread()); Flowable result2 = Flowable.create((FlowableEmitter s) -> { try { logger.info( "Emitted thread: {}" , Thread.currentThread().getName()); logger.info( "Result: {}" , r); s.onNext(r); } catch (Exception e) { s.onError(e); } s.onComplete(); }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread()); Flowable.zip(result, result2, (s, s2) -> s + s2).subscribe(System.out::println); } |
Output
1
2
3
4
5
|
2017-08-14 17:59:43 INFO RxJavaExampleAdvanced:120 - Emitted thread: RxNewThreadScheduler-1 2017-08-14 17:59:43 INFO RxJavaExampleAdvanced:121 - Result: [ "1" , "2" , "3" ] 2017-08-14 17:59:43 INFO RxJavaExampleAdvanced:131 - Emitted thread: RxNewThreadScheduler-2 2017-08-14 17:59:43 INFO RxJavaExampleAdvanced:132 - Result: [ "test1" , "test2" , "test3" ] [ "1" , "2" , "3" ][ "test1" , "test2" , "test3" ] |
6. Streaming Results
Finally, let’s examine streaming the results of asynchronous API calls where the results are emitted as they are available. This example just builds on the concepts introduced previously with the key addition of the ObservableHttp
calls that leverage a CloseableHttpAsyncClient
.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public static void streamObserable() throws URISyntaxException, IOException, InterruptedException { logger.info( "Executing Streaming Observable Over Http" ); CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault(); httpclient.start(); ObservableHttp httpclient) .toObservable().flatMap( new Func1<ObservableHttpResponse, rx.Observable>() { @Override public rx.Observable call(ObservableHttpResponse response) { return response.getContent().map( new Func1() { @Override public String call( byte [] bb) { logger.info( "timestamp inner " + SimpleDateFormat.getDateTimeInstance().format( new Date().getTime())); logger.info( "counter: " + RxJavaExample3.counter++); return new String(bb); } }); } }).buffer( 5 , TimeUnit.SECONDS, 5 , rx.schedulers.Schedulers.io()) .subscribeOn(rx.schedulers.Schedulers.io()).subscribe( new Action1<List>() { @Override public void call(List resp) { logger.info( "timestamp " + SimpleDateFormat.getDateTimeInstance().format( new Date().getTime())); logger.info(resp.toString()); } }); } |
Output
01
02
03
04
05
06
07
08
09
10
11
12
13
|
2017-08-14 18:06:20 INFO RxJavaExampleAdvanced:143 - Executing Streaming Observable Over Http 2017-08-14 18:06:23 INFO RxJavaExampleAdvanced:157 - timestamp inner Aug 14, 2017 6:06:23 PM 2017-08-14 18:06:23 INFO RxJavaExampleAdvanced:159 - counter: 0 2017-08-14 18:06:25 INFO RxJavaExampleAdvanced:157 - timestamp inner Aug 14, 2017 6:06:25 PM 2017-08-14 18:06:25 INFO RxJavaExampleAdvanced:159 - counter: 1 2017-08-14 18:06:26 INFO RxJavaExampleAdvanced:170 - timestamp Aug 14, 2017 6:06:26 PM 2017-08-14 18:06:26 INFO RxJavaExampleAdvanced:171 - [data:Message 2, data:Message 1] 2017-08-14 18:06:27 INFO RxJavaExampleAdvanced:157 - timestamp inner Aug 14, 2017 6:06:27 PM 2017-08-14 18:06:27 INFO RxJavaExampleAdvanced:159 - counter: 2 2017-08-14 18:06:31 INFO RxJavaExampleAdvanced:170 - timestamp Aug 14, 2017 6:06:31 PM 2017-08-14 18:06:31 INFO RxJavaExampleAdvanced:171 - [data:Message 0] 2017-08-14 18:06:36 INFO RxJavaExampleAdvanced:170 - timestamp Aug 14, 2017 6:06:36 PM 2017-08-14 18:06:36 INFO RxJavaExampleAdvanced:171 - [] |
Here is the Spring Boot streaming resource that ties it together.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
@Controller @RequestMapping ( "/stream" ) public class StreamController { private SseEmitter sseEmitter; ... @RequestMapping ( "/event2" ) public SseEmitter getRealTimeMessageAction2(HttpServletRequest request) { SseEmitter sseEmitter = new SseEmitter(); runAsync(sseEmitter); return sseEmitter; } private void runAsync(SseEmitter sseEmitter) { for ( int i = 0 ; i < 3 ; i++) { AsyncThread at = new AsyncThread(); at.setEmitter(sseEmitter); at.setSleep(( 6 - (i * 2 )) * 1000 ); at.setMessageId(i); at.start(); } } private class AsyncThread extends Thread { private SseEmitter sseEmitter; private int sleep; private int id; public void setEmitter(SseEmitter sseEmitter) { this .sseEmitter = sseEmitter; } public void setSleep( int sleep) { this .sleep = sleep; } public void setMessageId( int id) { this .id = id; } public void run() { try { try { Thread.sleep( this .sleep); logger.info( "Timestamp:" + SimpleDateFormat.getDateTimeInstance().format( new Date().getTime())); this .sseEmitter.send( "Message " + this .id); } catch (InterruptedException e) { logger.error(e.getMessage()); } } catch (IOException e) { logger.error(e.getMessage()); } } } |
7. Summary
In this example, we looked at making asynchronous calls using RxJava, including calls to RESTful web services. We examined how certain usages make all asynchronous calls on a single thread and how to use Observables to make each call on a separate thread. We also looked at combining the results of multiple calls and finally streaming results of service calls.
8. Download the Source Code
Here we demonstrated how to use the basic RxJava operations.
I didn’t find the rest web service. ALthough quite simple, can you provide it?