Core Java

Reactive Java (RxJava) Tutorial: Introduction

Reactive Java or RxJava is an implementation and enhancement of the observer pattern. It was intended for use in event driven schemes where nesting synchronous or asynchronous callback methods becomes overly complex. The key addition that RxJava provides in addition to the observer pattern is the ability to determine when event processing is complete or an error has occurred.

You can also check this tutorial in the following video:

Java Reactive Programming Tutorial – video

The primary components of reactive java are Observable / Flowable, Subscriber and operators. The idea is that an observable collects and emits actions to a subscriber who will perform an operation on the emitted items. Operators provide a way to manipulate the data that is emitted by an observable before it is sent to the subscriber for action. I will use the terms flowable and observable interchangeably in this example as they operate in a similar manner. The main difference is that a flowable will define a back pressure where an observable will not. The back pressure setting will define how downstream consumers handle emitted data.

1. Setup

We used Eclipse Neon, Java 8, Maven 3.3.9, Apache CLI 1.4 and RxJava 2.0.0.  At the time of this sample, I ran into issues using the last version, 2.0.7, with Eclipse as the source was not available.  This example uses a simple Java application to demonstrate Reactive functionality.  I used a command line switch with Apache’s cli tool to move back and forth between examples.

pom.xml

01
02
03
04
05
06
07
08
09
10
<dependency>
  <groupId>io.reactivex.rxjava2</groupId>
  <artifactId>rxjava</artifactId>
  <version>2.0.0</version>
</dependency>
<dependency>
  <groupId>commons-cli</groupId>
  <artifactId>commons-cli</artifactId>
  <version>1.4</version>
</dependency>

2. Flowables and Subscribers

We will start with a simple example of gather, emit and act on a single item. This will consist of a Flowable,Consumer, Subscriber and the subscribe method. A Flowable is just like an Observable but it supports back pressure which is used to define how a consumer handles emitted items.

2.1 Basics

We will look at a few examples that emit and consume items with a Flowable and Consumer using the subscribe method. These examples require at least a high level understanding of lambdas and Java 8. As a quick primer for what lies ahead the lambda expressions used will provide a simplified notation for anonymous inner classes or passing functions as parameters to methods. In our first example we will compare the three different notations you will see in this article.

Anonymous inner implementation in lambda

1
2
3
4
5
6
Flowable.just("Hello world").subscribe(new Consumer() {
        @Override
        public void accept(String t) throws Exception {
                System.out.println(t);
        }
});

Lambda

1
Flowable.just("Hello world").subscribe(s -> System.out.println(t));

Method Reference

1
Flowable.just("Hello world").subscribe(System.out::println);

2.2 Subscriber Details

Now lets explore the subscriber a little more. The subscribe method on a Flowable provides the option to implement onNext, onError and onCompleted. In the first example we saw the subscriber implement the onNext method from the Consumer interface but now lets look at one that implements all three.

1
2
3
4
5
Flowable.fromArray(1, 2, 3, 4).subscribe(
        i -> System.out.printf("Entry %d\n", i),
        e -> System.err.printf("Failed to process: %s\n", e),
        () -> System.out.println("Done")
);

Some of the key interfaces to understand when using a Flowable are Consumer and Subscriber. When you subscribe to a flowable you can either pass a Subscriber or pass the individual Consumer implementations that represent onNext, onError and onComplete. These method implementations are optional and provide convenience for working with observables.

Consumer

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
//Consumer { void accept(T t); }
 
Flowable.fromArray(1, 2, 3, 4).subscribe(new Consumer<Integer>() {
 @Override
 public void accept(Integer t) throws Exception {
   System.out.printf("Entry %d\n", t);
 }
 }, new Consumer<Throwable>() {
 @Override
 public void accept(Throwable t) throws Exception {
   System.err.printf("Failed to process: %s\n", t);
 }
 }, new Action() {
 @Override
 public void run() throws Exception {
   System.out.println("Done");
 }
});

Subscriber

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//Subscriber { void onNext(T t); void onError(Throwable t); void onComplete(); }
 
Subscriber<Integer> subscriber = new Subscriber<Integer>(){
 
 @Override
 public void onSubscribe(Subscription s) {
  
 }
 
 @Override
 public void onNext(Integer t) {
 System.out.printf("Entry %d\n", t);
 }
 
 @Override
 public void onError(Throwable t) {
 System.err.printf("Failed to process: %s\n", t);
 }
 
 @Override
 public void onComplete() {
 System.out.println("Done");
 }
  
};
Flowable.fromArray(1,2,3,4).subscribe(subscriber);

2.3 Flowables

To create your own flowable you implement the FlowableOnSubscribe and provide the back pressure strategy. The back pressure strategy indicates how you intend to handle emitted items by either waiting, skipping, erring or holding the items for the consumer. In this implementation will use the onNext method to send a few integers and buffer the items until the downstream consumer is ready.

1
2
3
4
5
6
Flowable flowable = Flowable.create((FlowableEmitter emitter) -> {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
}, BackpressureStrategy.BUFFER);
flowable.subscribe(System.out::println);

3. Transformations

There are many different transformation mechanisms and you can see a list here. Two that we will examine in this section are the map and flatMap. The map method is used for taking emitted items and modifying the data. With these methods you can change the data or even the type. The flatMap method is used for performing operations on emitted items and returning a new flowable/observable with new emitted items that can be processed by the subscriber. This means that a map is meant for processing a single emitted item while flatMap can process one or more emitted items as an flowable is designed to handle.  Let’s look at a couple of examples.

3.1 Map

As you can see in this example, the Flowable emits a String that it transforms to an int that it will send to the Subscriber.

1
Flowable.just("1").map(s -> Integer.parseInt(s)).subscribe(System.out::println);

3.2 FlatMap (ConcatMap which orders)

In this example we are taking the emitted items and applying the flatMap method to it which in turn responds with a new flowable of type Integer. This is the critical difference between map and flatMap that it returns the emitted results while flatMap returns a new flowable of the emitted type. This is powerful when you have complex transformations that need to process multiple emitted items the way a flowable will process them.

FlatMap Verbose

1
2
3
4
5
Observable.fromArray(1, 2, 3, 4).flatMap(new Function<Integer, ObservableSource>() {
        @Override
        public ObservableSource apply(Integer t) throws Exception {
            return Observable.just(t+50);
        }}).subscribe(System.out::println);   

FlatMap Change the Type

1
Observable.fromArray(1, 2, 3, 4).flatMap(t -> Observable.just(Integer.toString(t+50))).subscribe(s -> System.out.println(s));

FlatMap Flowable

1
Flowable.fromArray(1, 2, 3, 4).flatMap(t -> Flowable.just(t+50)).subscribe(System.out::println);

4. Schedulers

Schedulers provide asynchronous operations for the observables and define which thread they use. We will examine this topic in the next example when we look more closely at asynchronous calls but the next code snippet contains a simple example. In the example below the output written will be sent on a separate thread because of the subscribeOn method. By passing the Scheduler.io() parameter to this method a new thread will spin up to execute the write to output in a ThreadPool.

1
Flowable.just("Hello world").subscribeOn(Schedulers.io()).subscribe(System.out::println);

5. Summary

In this example we explored the basics of processing data and events with RxJava.  We saw how to build a flowable, apply transformations to the items emitted by the flowable and how to subscribe to flowables.  Two areas conducive to working with RxJava are in cases were your UI is processing events or if you need to process asynchronous calls to services.

In the next example, we will take a deeper dive into asynchronous requests and the benefits of leveraging RxJava. You can take a look here.

4. Download the Source Code

Here we demonstrated how to use the basic RxJava operations.

Download
You can download the Eclipse project here: Reactive Java (RxJava) Tutorial: Introduction

Andy Beck

Andy is a Senior Software Engineer that has sixteen years of experience working on web, desktop and mobile applications. He holds a Bachelor and Master's degree in Computer Science from the University of Virginia and George Washington University respectively. He specializes in working with Java web services and has significant experience working web applications, databases and continuous integration and deployments. He is currently working as a technical lead at a financial technology organization where he supports mobile application services in Java.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button