Reactive Java (RxJava) Tutorial: Introduction
Reactive Java or RxJava is an implementation and enhancement of the observer pattern. It was intended for use in event driven schemes where nesting synchronous or asynchronous callback methods becomes overly complex. The key addition that RxJava provides in addition to the observer pattern is the ability to determine when event processing is complete or an error has occurred.
You can also check this tutorial in the following video:
The primary components of reactive java are Observable
/ Flowable
, Subscriber
and operators. The idea is that an observable collects and emits actions to a subscriber who will perform an operation on the emitted items. Operators provide a way to manipulate the data that is emitted by an observable before it is sent to the subscriber for action. I will use the terms flowable and observable interchangeably in this example as they operate in a similar manner. The main difference is that a flowable will define a back pressure where an observable will not. The back pressure setting will define how downstream consumers handle emitted data.
1. Setup
We used Eclipse Neon, Java 8, Maven 3.3.9, Apache CLI 1.4 and RxJava 2.0.0. At the time of this sample, I ran into issues using the last version, 2.0.7, with Eclipse as the source was not available. This example uses a simple Java application to demonstrate Reactive functionality. I used a command line switch with Apache’s cli tool to move back and forth between examples.
pom.xml
01 02 03 04 05 06 07 08 09 10 | < dependency > < groupId >io.reactivex.rxjava2</ groupId > < artifactId >rxjava</ artifactId > < version >2.0.0</ version > </ dependency > < dependency > < groupId >commons-cli</ groupId > < artifactId >commons-cli</ artifactId > < version >1.4</ version > </ dependency > |
2. Flowables and Subscribers
We will start with a simple example of gather, emit and act on a single item. This will consist of a Flowable
,Consumer
, Subscriber
and the subscribe method. A Flowable
is just like an Observable
but it supports back pressure which is used to define how a consumer handles emitted items.
2.1 Basics
We will look at a few examples that emit and consume items with a Flowable
and Consumer
using the subscribe method. These examples require at least a high level understanding of lambdas and Java 8. As a quick primer for what lies ahead the lambda expressions used will provide a simplified notation for anonymous inner classes or passing functions as parameters to methods. In our first example we will compare the three different notations you will see in this article.
Anonymous inner implementation in lambda
1 2 3 4 5 6 | Flowable.just( "Hello world" ).subscribe( new Consumer() { @Override public void accept(String t) throws Exception { System.out.println(t); } }); |
Lambda
1 | Flowable.just( "Hello world" ).subscribe(s -> System.out.println(t)); |
Method Reference
1 | Flowable.just( "Hello world" ).subscribe(System.out::println); |
2.2 Subscriber Details
Now lets explore the subscriber a little more. The subscribe method on a Flowable
provides the option to implement onNext
, onError
and onCompleted
. In the first example we saw the subscriber implement the onNext
method from the Consumer
interface but now lets look at one that implements all three.
1 2 3 4 5 | Flowable.fromArray( 1 , 2 , 3 , 4 ).subscribe( i -> System.out.printf( "Entry %d\n" , i), e -> System.err.printf( "Failed to process: %s\n" , e), () -> System.out.println( "Done" ) ); |
Some of the key interfaces to understand when using a Flowable
are Consumer
and Subscriber
. When you subscribe to a flowable you can either pass a Subscriber
or pass the individual Consumer
implementations that represent onNext, onError and onComplete. These method implementations are optional and provide convenience for working with observables.
Consumer
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 | //Consumer { void accept(T t); } Flowable.fromArray( 1 , 2 , 3 , 4 ).subscribe( new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.printf( "Entry %d\n" , t); } }, new Consumer<Throwable>() { @Override public void accept(Throwable t) throws Exception { System.err.printf( "Failed to process: %s\n" , t); } }, new Action() { @Override public void run() throws Exception { System.out.println( "Done" ); } }); |
Subscriber
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | //Subscriber { void onNext(T t); void onError(Throwable t); void onComplete(); } Subscriber<Integer> subscriber = new Subscriber<Integer>(){ @Override public void onSubscribe(Subscription s) { } @Override public void onNext(Integer t) { System.out.printf( "Entry %d\n" , t); } @Override public void onError(Throwable t) { System.err.printf( "Failed to process: %s\n" , t); } @Override public void onComplete() { System.out.println( "Done" ); } }; Flowable.fromArray( 1 , 2 , 3 , 4 ).subscribe(subscriber); |
2.3 Flowables
To create your own flowable you implement the FlowableOnSubscribe
and provide the back pressure strategy. The back pressure strategy indicates how you intend to handle emitted items by either waiting, skipping, erring or holding the items for the consumer. In this implementation will use the onNext method to send a few integers and buffer the items until the downstream consumer is ready.
1 2 3 4 5 6 | Flowable flowable = Flowable.create((FlowableEmitter emitter) -> { emitter.onNext( 1 ); emitter.onNext( 2 ); emitter.onComplete(); }, BackpressureStrategy.BUFFER); flowable.subscribe(System.out::println); |
3. Transformations
There are many different transformation mechanisms and you can see a list here. Two that we will examine in this section are the map and flatMap. The map method is used for taking emitted items and modifying the data. With these methods you can change the data or even the type. The flatMap method is used for performing operations on emitted items and returning a new flowable/observable with new emitted items that can be processed by the subscriber. This means that a map is meant for processing a single emitted item while flatMap can process one or more emitted items as an flowable is designed to handle. Let’s look at a couple of examples.
3.1 Map
As you can see in this example, the Flowable emits a String that it transforms to an int that it will send to the Subscriber.
1 | Flowable.just( "1" ).map(s -> Integer.parseInt(s)).subscribe(System.out::println); |
3.2 FlatMap (ConcatMap which orders)
In this example we are taking the emitted items and applying the flatMap method to it which in turn responds with a new flowable of type Integer. This is the critical difference between map and flatMap that it returns the emitted results while flatMap returns a new flowable of the emitted type. This is powerful when you have complex transformations that need to process multiple emitted items the way a flowable will process them.
FlatMap Verbose
1 2 3 4 5 | Observable.fromArray( 1 , 2 , 3 , 4 ).flatMap( new Function<Integer, ObservableSource>() { @Override public ObservableSource apply(Integer t) throws Exception { return Observable.just(t+ 50 ); }}).subscribe(System.out::println); |
FlatMap Change the Type
1 | Observable.fromArray( 1 , 2 , 3 , 4 ).flatMap(t -> Observable.just(Integer.toString(t+ 50 ))).subscribe(s -> System.out.println(s)); |
FlatMap Flowable
1 | Flowable.fromArray( 1 , 2 , 3 , 4 ).flatMap(t -> Flowable.just(t+ 50 )).subscribe(System.out::println); |
4. Schedulers
Schedulers provide asynchronous operations for the observables and define which thread they use. We will examine this topic in the next example when we look more closely at asynchronous calls but the next code snippet contains a simple example. In the example below the output written will be sent on a separate thread because of the subscribeOn
method. By passing the Scheduler.io()
parameter to this method a new thread will spin up to execute the write to output in a ThreadPool
.
1 | Flowable.just( "Hello world" ).subscribeOn(Schedulers.io()).subscribe(System.out::println); |
5. Summary
In this example we explored the basics of processing data and events with RxJava. We saw how to build a flowable, apply transformations to the items emitted by the flowable and how to subscribe to flowables. Two areas conducive to working with RxJava are in cases were your UI is processing events or if you need to process asynchronous calls to services.
In the next example, we will take a deeper dive into asynchronous requests and the benefits of leveraging RxJava. You can take a look here.
4. Download the Source Code
Here we demonstrated how to use the basic RxJava operations.