Home » Core Java » Reactive Java (RxJava) Tutorial: Advanced

About Andy Beck

Andy Beck
Andy is a Senior Software Engineer that has sixteen years of experience working on web, desktop and mobile applications. He holds a Bachelor and Master's degree in Computer Science from the University of Virginia and George Washington University respectively. He specializes in working with Java web services and has significant experience working web applications, databases and continuous integration and deployments. He is currently working as a technical lead at a financial technology organization where he supports mobile application services in Java.

Reactive Java (RxJava) Tutorial: Advanced

Reactive Java or RxJava is an implementation and enhancement of the observer pattern. It was intended for use in event driven schemes where nesting synchronous or asynchronous callback methods becomes overly complex. It abstracts some of the more complex tasks associated with asynchronous operations including threading and concurrency.

This example builds on the basics described in Part One of this topic, https://examples.javacodegeeks.com/core-java/reactive-java-rxjava-tutorial-introduction. If you are not familiar with the basics, please look back at this previous article before moving forward with these more advanced concepts. This article will examine asynchronous or concurrency tasks that are made easier with RxJava. The concurrency problems that RxJava is suitable to solve and we will look at in this example include:

  • Nested callbacks
  • Making asynchronous calls
  • Aggregating or combining asynchronous calls
  • Streaming

1. Setup

We used Eclipse Neon, Java 8, Maven 3.3.9 and RxJava 2.0.0.  At the time of this sample, we ran into issues using the last version, 2.0.7, with Eclipse as the source was not available. This example uses a simple Java application to demonstrate Reactive functionality.

pom.xml

<dependency>
  <groupId>io.reactivex.rxjava2</groupId>
  <artifactId>rxjava</artifactId>
  <version>2.0.0</version>
</dependency>

2. Simple Asynchronous Call

Let’s start by exploring the asynchronous capabilities in RxJava. In the next few examples we will use RxJava to spawn new threads to do various tasks. The default behavior of an Observable is to observe on the same thread where the subscribe method is called. You can introduce an asynchronous process using the subscribeOn method. Here we will look at a simple asynchronous call in RxJava.

Notice that in this example we are subscribing on the Schedulers.newThread() scheduler and using lambda notation to execute the serialize method of the FlowableEmitter interface. The FlowableEmitter interface requires that we call the onNext, onError and onComplete methods sequentially. In this example, we will simplify things and use lambda notation to call the serialize method which will ensure that these methods are serialized. The body of this method will act as the worker thread and will sleep for three seconds while the process moves ahead. You can see from the output that the flowable messages, “RxJavaExampleAdvanced:51 – Executing async flowable…”, are logged after the main thread finishes “RxJavaExampleAdvanced:59 – Finished simple async”.

  
    public static void simpleAsync() {
            logger.info("Starting simple async");

            Flowable.create((FlowableEmitter s) -> {
                    try {
                            logger.info("Executing async flowable.");
                            Thread.sleep(3000);
                            logger.info("Finished async flowable.");
                    } catch (Exception e) {
                    }
                    s.onComplete();
            }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread()).subscribe();

            logger.info("Finished simple async");
    }

Output

2017-07-16 10:35:03 INFO  RxJavaExampleAdvanced:47 - Starting simple async
2017-07-16 10:35:03 INFO  RxJavaExampleAdvanced:59 - Finished simple async
2017-07-16 10:35:03 INFO  RxJavaExampleAdvanced:51 - Executing async flowable.
2017-07-16 10:35:06 INFO  RxJavaExampleAdvanced:53 - Finished async flowable.

3. Asynchronous Web Service Call

A common usage of RxJava is to make long running calls or calls with unpredictable finish times asynchronously. This allows the code to handle other tasks while it is waiting for the long running call to finish. You might see a client user interface make a web service call for data asynchronously so it can finish displaying the components to the user that are not dependent on the data in the service call. In the next example, we will explore using RxJava to make asynchronous calls to a web service. We make one simple call to a web service or API that returns a String result. You’ll notice that the API is executed on a new thread “RxNewThreadScheduler-1” not the main thread. Again we use the FlowableEmitter interface with lambda notation to execute the serialize method which makes a rest call to our API on a new thread.

 
    public static void simpleAsyncAPICalls() {
            logger.info("Starting async api");
            logger.info("Main Thread: {}", Thread.currentThread().getName());
            Flowable.create((FlowableEmitter s) -> {
                    try {
                    String result = makeCallString("http://localhost:8080/jcg/service/stream/no");
                    logger.info("Emitted thread: {}", Thread.currentThread().getName());
                    logger.info("Result: {}", result);
                    s.onNext(result);
                    } catch (Exception e) {
                            s.onError(e);
                    }
                    s.onComplete();
            }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread()).subscribe(logger::info);

            logger.info("Ending async api");
    }

    private static String makeCallString(String URI) {
            RestTemplate restTemplate = new RestTemplate();
            String result = restTemplate.getForObject(URI, String.class);
            return result;
    }

Output

2017-07-29 10:49:25 INFO  RxJavaExampleAdvanced:63 - Starting async api
2017-07-29 10:49:25 INFO  RxJavaExampleAdvanced:64 - Main Thread: main
2017-07-29 10:49:26 INFO  RxJavaExampleAdvanced:77 - Ending async api
2017-07-29 10:49:26 INFO  RxJavaExampleAdvanced:68 - Emitted thread: RxNewThreadScheduler-1
2017-07-29 10:49:26 INFO  RxJavaExampleAdvanced:69 - Result: Hello Stream!
2017-07-29 10:49:26 INFO  RxJavaExampleAdvanced:? - Hello Stream!

4. Multiple Asynchronous Web Service Calls

Many times you’ll need to make multiple calls to a web service. In this next example we’ll leverage the map function of RxJava to execute and return the response from multiple API calls. As a reminder, the map function returns the data type of the Flowable while flatMap will return the Flowable object. Using the Flowable that is returned from a flatMap call would allow you to take further reactive actions on the response. You’ll see this in later examples but in this case we are just outputing the result and will not need to make use of the Flowable.

 
	public static void multipleAsyncAPICalls() {
		logger.info("Starting multi async api");
		logger.info("Main Thread: {}", Thread.currentThread().getName());
		Flowable.fromArray("http://localhost:8080/jcg/service/stream/no", "http://localhost:8080/jcg/service/stream/no",
				"http://localhost:8080/jcg/service/stream/no", "http://localhost:8080/jcg/service/stream/no")
				.map(new Function() {
					int resultCount = 0;
					@Override
					public String apply(String t) throws Exception {
						String result = makeCallString(t);
						logger.info("Emitted thread: {}", Thread.currentThread().getName());
						logger.info("Result {}: {}", resultCount++, result);
						return result + " on " + Thread.currentThread().getName();
					}
				}).subscribeOn(Schedulers.newThread()).subscribe(logger::info);
		logger.info("Ending multi async api");
	}

Output

2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:123 - Starting multi async api
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:124 - Main Thread: main
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:137 - Ending multi async api
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-1
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:133 - Result 0: Hello Stream!
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-1
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-1
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:133 - Result 1: Hello Stream!
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-1
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-1
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:133 - Result 2: Hello Stream!
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-1
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:132 - Emitted thread: RxNewThreadScheduler-1
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:133 - Result 3: Hello Stream!
2017-07-16 10:46:12 INFO  RxJavaExampleAdvanced:? - Hello Stream! on RxNewThreadScheduler-1

While this code will operate the emitted calls on another thread it does not create a new thread for each item. If we want to have our process execute on a new thread for each item we will need to do something slightly different. As a developer leveraging these reactive mechanisms you need to think carefully about which calls you want to make synchronously versus those that you want to make asynchronously. If you need to use the results of earlier calls in later calls, you will want each to execute synchronously. If you just need all the data returned and you are not concerned with the order in which it is returned, you might have a good candidate to make each call asynchronously which will improve the overall performance.

We will start with a simple asynchronous example where multiple items are emitted and build on that to use the same technique for multiple API calls. There’s a lot going on in this small sample. We are using the flatMap method to operate on an array of items and transform the array to another Observable for each item. We’ll need to do that to ensure that we can subscribeOn each item which will execute those corresponding operations on a different thread. This is how we ensure that each emitted item is handled asychronously rather than the group sequentially executed on a separate thread. Notice that we have a different thread for both of the items in this example; "Thread[RxNewThreadScheduler-1,5,main]", "Thread[RxNewThreadScheduler-2,5,main]".

 
    public static void simpleAsyncMulti() {
            logger.info("Starting multi async");

            Observable.just(1, 2)
                            .flatMap(item -> Observable.just(item.toString()).subscribeOn(Schedulers.newThread())
                                            .doOnNext(i -> logger.info(Thread.currentThread().toString())))
                            .subscribe(logger::info);

            logger.info("Ending multi async");
    }

Output

2017-07-16 10:36:49 INFO  RxJavaExampleAdvanced:63 - Starting multi async
2017-07-16 10:36:49 INFO  RxJavaExampleAdvanced:70 - Ending multi async
2017-07-16 10:36:49 INFO  RxJavaExampleAdvanced:67 - Thread[RxNewThreadScheduler-1,5,main]
2017-07-16 10:36:49 INFO  RxJavaExampleAdvanced:67 - Thread[RxNewThreadScheduler-2,5,main]
2017-07-16 10:36:49 INFO  RxJavaExampleAdvanced:? - 1
2017-07-16 10:36:49 INFO  RxJavaExampleAdvanced:? - 2

Now lets take a look at an asynchronous example where we spawn a new thread for each API call. Again we map each emitted value to a new Observable with a single item and subscribeOn to a new thread. Again, you can see in this sample that the thread id that is executing each request is different, i.e. Thread:Thread[RxNewThreadScheduler-4,5,main].

 
    Observable
        .just("http://localhost:8080/jcg/service/stream/no", "http://localhost:8080/jcg/service/stream/no",
                        "http://localhost:8080/jcg/service/stream/no", "http://localhost:8080/jcg/service/stream/no")
        .flatMap(item -> Observable.just(item).subscribeOn(Schedulers.newThread()).doOnNext(i -> {
                logger.info(makeCallString(i));
                logger.info(Thread.currentThread().toString());
        })).subscribe(System.out::println);

Output

2017-07-04 08:57:22 INFO  RxJavaExampleAdvanced:189 - Hello Stream!
2017-07-04 08:57:22 INFO  RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-4,5,main]
http://localhost:8080/jcg/service/stream/no
2017-07-04 08:57:22 INFO  RxJavaExampleAdvanced:189 - Hello Stream!
2017-07-04 08:57:22 INFO  RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-3,5,main]
http://localhost:8080/jcg/service/stream/no
2017-07-04 08:57:22 INFO  RxJavaExampleAdvanced:189 - Hello Stream!
2017-07-04 08:57:22 INFO  RxJavaExampleAdvanced:189 - Hello Stream!
2017-07-04 08:57:22 INFO  RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-2,5,main]
http://localhost:8080/jcg/service/stream/no
2017-07-04 08:57:22 INFO  RxJavaExampleAdvanced:189 - Thread[RxNewThreadScheduler-1,5,main]
http://localhost:8080/jcg/service/stream/no

5. Combine Asynchronous Results

To build on this technique, we will make multiple API calls and zip or combine the results. We are making multiple API calls asychronously, each on its own thread, and using the zip method to combine the results.

 
    public static void flatMapZipAsyncAPICalls() {
            Flowable result = Flowable.create((FlowableEmitter s) -> {
                    try {
                            String r = makeCallString("http://localhost:8080/jcg/service/stream/no/int/list");
                            logger.info("Emitted thread: {}", Thread.currentThread().getName());
                            logger.info("Result: {}", r);
                            s.onNext(r);
                    } catch (Exception e) {
                            s.onError(e);
                    }
                    s.onComplete();
            }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread());
            Flowable result2 = Flowable.create((FlowableEmitter s) -> {
                    try {
                            String r = makeCallString("http://localhost:8080/jcg/service/stream/no/string/list");
                            logger.info("Emitted thread: {}", Thread.currentThread().getName());
                            logger.info("Result: {}", r);
                            s.onNext(r);
                    } catch (Exception e) {
                            s.onError(e);
                    }
                    s.onComplete();
            }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.newThread());
            Flowable.zip(result, result2, (s, s2) -> s + s2).subscribe(System.out::println);
    }

Output

2017-08-14 17:59:43 INFO  RxJavaExampleAdvanced:120 - Emitted thread: RxNewThreadScheduler-1
2017-08-14 17:59:43 INFO  RxJavaExampleAdvanced:121 - Result: ["1","2","3"]
2017-08-14 17:59:43 INFO  RxJavaExampleAdvanced:131 - Emitted thread: RxNewThreadScheduler-2
2017-08-14 17:59:43 INFO  RxJavaExampleAdvanced:132 - Result: ["test1","test2","test3"]
["1","2","3"]["test1","test2","test3"]

6. Streaming Results

Finally, let’s examine streaming the results of asynchronous API calls where the results are emitted as they are available. This example just builds on the concepts introduced previously with the key addition of the ObservableHttp calls that leverage a CloseableHttpAsyncClient.

 
    public static void streamObserable() throws URISyntaxException, IOException, InterruptedException {
            logger.info("Executing Streaming Observable Over Http");
            CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
            httpclient.start();

            ObservableHttp
                            .createRequest(HttpAsyncMethods.createGet("http://localhost:8080/jcg/service/stream/event2"),
                                            httpclient)
                            .toObservable().flatMap(new Func1<ObservableHttpResponse, rx.Observable>() {
                                    @Override
                                    public rx.Observable call(ObservableHttpResponse response) {
                                            return response.getContent().map(new Func1() {

                                                    @Override
                                                    public String call(byte[] bb) {
                                                            logger.info("timestamp inner "
                                                                            + SimpleDateFormat.getDateTimeInstance().format(new Date().getTime()));
                                                            logger.info("counter: " + RxJavaExample3.counter++);
                                                            return new String(bb);
                                                    }

                                            });
                                    }
                            }).buffer(5, TimeUnit.SECONDS, 5, rx.schedulers.Schedulers.io())
                            .subscribeOn(rx.schedulers.Schedulers.io()).subscribe(new Action1<List>() {

                                    @Override
                                    public void call(List resp) {
                                            logger.info("timestamp " + SimpleDateFormat.getDateTimeInstance().format(new Date().getTime()));
                                            logger.info(resp.toString());
                                    }
                            });

    }

Output

2017-08-14 18:06:20 INFO  RxJavaExampleAdvanced:143 - Executing Streaming Observable Over Http
2017-08-14 18:06:23 INFO  RxJavaExampleAdvanced:157 - timestamp inner Aug 14, 2017 6:06:23 PM
2017-08-14 18:06:23 INFO  RxJavaExampleAdvanced:159 - counter: 0
2017-08-14 18:06:25 INFO  RxJavaExampleAdvanced:157 - timestamp inner Aug 14, 2017 6:06:25 PM
2017-08-14 18:06:25 INFO  RxJavaExampleAdvanced:159 - counter: 1
2017-08-14 18:06:26 INFO  RxJavaExampleAdvanced:170 - timestamp Aug 14, 2017 6:06:26 PM
2017-08-14 18:06:26 INFO  RxJavaExampleAdvanced:171 - [data:Message 2, data:Message 1]
2017-08-14 18:06:27 INFO  RxJavaExampleAdvanced:157 - timestamp inner Aug 14, 2017 6:06:27 PM
2017-08-14 18:06:27 INFO  RxJavaExampleAdvanced:159 - counter: 2
2017-08-14 18:06:31 INFO  RxJavaExampleAdvanced:170 - timestamp Aug 14, 2017 6:06:31 PM
2017-08-14 18:06:31 INFO  RxJavaExampleAdvanced:171 - [data:Message 0]
2017-08-14 18:06:36 INFO  RxJavaExampleAdvanced:170 - timestamp Aug 14, 2017 6:06:36 PM
2017-08-14 18:06:36 INFO  RxJavaExampleAdvanced:171 - []

Here is the Spring Boot streaming resource that ties it together.

 
@Controller
@RequestMapping("/stream")
public class StreamController {

	private SseEmitter sseEmitter;

...

    @RequestMapping("/event2")
    public SseEmitter getRealTimeMessageAction2(HttpServletRequest request) {
            SseEmitter sseEmitter = new SseEmitter();
            runAsync(sseEmitter);
            return sseEmitter;
    }
    
    
    private void runAsync(SseEmitter sseEmitter) {
        for (int i = 0; i < 3; i++) {
                AsyncThread at = new AsyncThread();
                at.setEmitter(sseEmitter);
                at.setSleep((6 - (i * 2)) * 1000);
                at.setMessageId(i);
                at.start();
        }
    }

    private class AsyncThread extends Thread {
            private SseEmitter sseEmitter;
            private int sleep;
            private int id;

            public void setEmitter(SseEmitter sseEmitter) {
                    this.sseEmitter = sseEmitter;
            }

            public void setSleep(int sleep) {
                    this.sleep = sleep;
            }

            public void setMessageId(int id) {
                    this.id = id;
            }

            public void run() {
                    try {
                            try {
                                    Thread.sleep(this.sleep);
                                    logger.info("Timestamp:" + SimpleDateFormat.getDateTimeInstance().format(new Date().getTime()));
                                    this.sseEmitter.send("Message " + this.id);
                            } catch (InterruptedException e) {
                                    logger.error(e.getMessage());
                            }
                    } catch (IOException e) {
                            logger.error(e.getMessage());
                    }
            }
    }

7. Conclusion

In this example we looked at making asynchronous calls using RxJava, including calls to RESTful web services. We examined how certain usages make all asynchronous calls on a single thread and how to use Observables to make each call on a separate thread. We also looked at combining the results of multiple calls and finally streaming results of service calls.

8. Download the Source Code

Here we demonstrated how to use the basic RxJava operations.

Download
You can download the Eclipse project here: RxJavaExampleAdvanced
(No Ratings Yet)
1 Comment Views Tweet it!

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you our best selling eBooks for FREE!

 

1. JPA Mini Book

2. JVM Troubleshooting Guide

3. JUnit Tutorial for Unit Testing

4. Java Annotations Tutorial

5. Java Interview Questions

6. Spring Interview Questions

7. Android UI Design

 

and many more ....

 

Receive Java & Developer job alerts in your Area

 

1
Leave a Reply

avatar
1 Comment threads
0 Thread replies
0 Followers
 
Most reacted comment
Hottest comment thread
1 Comment authors
Jimis Recent comment authors
  Subscribe  
newest oldest most voted
Notify of
Jimis
Guest
Jimis

I didn’t find the rest web service. ALthough quite simple, can you provide it?