Home » Core Java » Reactive Java (RxJava) Tutorial: Introduction

About Andy Beck

Andy Beck
Andy is a Senior Software Engineer that has sixteen years of experience working on web, desktop and mobile applications. He holds a Bachelor and Master's degree in Computer Science from the University of Virginia and George Washington University respectively. He specializes in working with Java web services and has significant experience working web applications, databases and continuous integration and deployments. He is currently working as a technical lead at a financial technology organization where he supports mobile application services in Java.

Reactive Java (RxJava) Tutorial: Introduction

Reactive Java or RxJava is an implementation and enhancement of the observer pattern. It was intended for use in event driven schemes where nesting synchronous or asynchronous callback methods becomes overly complex. The key addition that RxJava provides in addition to the observer pattern is the ability to determine when event processing is complete or an error has occurred.

The primary components of reactive java are Observable / Flowable, Subscriber and operators. The idea is that an observable collects and emits actions to a subscriber who will perform an operation on the emitted items. Operators provide a way to manipulate the data that is emitted by an observable before it is sent to the subscriber for action. I will use the terms flowable and observable interchangeably in this example as they operate in a similar manner. The main difference is that a flowable will define a back pressure where an observable will not. The back pressure setting will define how downstream consumers handle emitted data.

1. Setup

We used Eclipse Neon, Java 8, Maven 3.3.9, Apache CLI 1.4 and RxJava 2.0.0.  At the time of this sample, I ran into issues using the last version, 2.0.7, with Eclipse as the source was not available.  This example uses a simple Java application to demonstrate Reactive functionality.  I used a command line switch with Apache’s cli tool to move back and forth between examples.

pom.xml

<dependency>
  <groupId>io.reactivex.rxjava2</groupId>
  <artifactId>rxjava</artifactId>
  <version>2.0.0</version>
</dependency>
<dependency>
  <groupId>commons-cli</groupId>
  <artifactId>commons-cli</artifactId>
  <version>1.4</version>
</dependency>

2. Flowables and Subscribers

We will start with a simple example of gather, emit and act on a single item. This will consist of a Flowable,Consumer, Subscriber and the subscribe method. A Flowable is just like an Observable but it supports back pressure which is used to define how a consumer handles emitted items.

2.1 Basics

We will look at a few examples that emit and consume items with a Flowable and Consumer using the subscribe method. These examples require at least a high level understanding of lambdas and Java 8. As a quick primer for what lies ahead the lambda expressions used will provide a simplified notation for anonymous inner classes or passing functions as parameters to methods. In our first example we will compare the three different notations you will see in this article.

Anonymous inner implementation in lambda

    Flowable.just("Hello world").subscribe(new Consumer() {
            @Override
            public void accept(String t) throws Exception {
                    System.out.println(t);
            }
    });

Lambda

    
    Flowable.just("Hello world").subscribe(s -> System.out.println(t));

Method Reference

Flowable.just("Hello world").subscribe(System.out::println);

2.2 Subscriber Details

Now lets explore the subscriber a little more. The subscribe method on a Flowable provides the option to implement onNext, onError and onCompleted. In the first example we saw the subscriber implement the onNext method from the Consumer interface but now lets look at one that implements all three.

    Flowable.fromArray(1, 2, 3, 4).subscribe(
            i -> System.out.printf("Entry %d\n", i),
            e -> System.err.printf("Failed to process: %s\n", e),
            () -> System.out.println("Done")
    );

Some of the key interfaces to understand when using a Flowable are Consumer and Subscriber. When you subscribe to a flowable you can either pass a Subscriber or pass the individual Consumer implementations that represent onNext, onError and onComplete. These method implementations are optional and provide convenience for working with observables.

Consumer

//Consumer { void accept(T t); }

Flowable.fromArray(1, 2, 3, 4).subscribe(new Consumer<Integer>() {
 @Override
 public void accept(Integer t) throws Exception {
   System.out.printf("Entry %d\n", t);
 }
 }, new Consumer<Throwable>() {
 @Override
 public void accept(Throwable t) throws Exception {
   System.err.printf("Failed to process: %s\n", t);
 }
 }, new Action() {
 @Override
 public void run() throws Exception {
   System.out.println("Done");
 }
});

Subscriber

//Subscriber { void onNext(T t); void onError(Throwable t); void onComplete(); }

Subscriber<Integer> subscriber = new Subscriber<Integer>(){

 @Override
 public void onSubscribe(Subscription s) {
 
 }

 @Override
 public void onNext(Integer t) {
 System.out.printf("Entry %d\n", t);
 }

 @Override
 public void onError(Throwable t) {
 System.err.printf("Failed to process: %s\n", t);
 }

 @Override
 public void onComplete() {
 System.out.println("Done");
 }
 
};
Flowable.fromArray(1,2,3,4).subscribe(subscriber);

2.3 Flowables

To create your own flowable you implement the FlowableOnSubscribe and provide the back pressure strategy. The back pressure strategy indicates how you intend to handle emitted items by either waiting, skipping, erring or holding the items for the consumer. In this implementation will use the onNext method to send a few integers and buffer the items until the downstream consumer is ready.

Flowable flowable = Flowable.create((FlowableEmitter emitter) -> {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
}, BackpressureStrategy.BUFFER);
flowable.subscribe(System.out::println);

3. Transformations

There are many different transformation mechanisms and you can see a list at, http://reactivex.io/documentation/operators.html#alphabetical. Two that we will examine in this section are the map and flatMap. The map method is used for taking emitted items and modifying the data. With these methods you can change the data or even the type. The flatMap method is used for performing operations on emitted items and returning a new flowable/observable with new emitted items that can be processed by the subscriber. This means that a map is meant for processing a single emitted item while flatMap can process one or more emitted items as an flowable is designed to handle.  Lets look at a couple examples.

3.1 Map

As you can see in this example, the Flowable emits a String that it transforms to an int that it will send to the Subscriber.

    Flowable.just("1").map(s -> Integer.parseInt(s)).subscribe(System.out::println);

3.2 FlatMap (ConcatMap which orders)

In this example we are taking the emitted items and applying the flatMap method to it which in turn responds with a new flowable of type Integer. This is the critical difference between map and flatMap that it returns the emitted results while flatMap returns a new flowable of the emitted type. This is powerful when you have complex transformations that need to process multiple emitted items the way a flowable will process them.

FlatMap Verbose

    Observable.fromArray(1, 2, 3, 4).flatMap(new Function<Integer, ObservableSource>() {
            @Override
            public ObservableSource apply(Integer t) throws Exception {
                return Observable.just(t+50);
            }}).subscribe(System.out::println);    

FlatMap Change the Type

    Observable.fromArray(1, 2, 3, 4).flatMap(t -> Observable.just(Integer.toString(t+50))).subscribe(s -> System.out.println(s));

FlatMap Flowable

    Flowable.fromArray(1, 2, 3, 4).flatMap(t -> Flowable.just(t+50)).subscribe(System.out::println);

4. Schedulers

Schedulers provide asynchronous operations for the observables and define which thread they use. We will examine this topic in the next example when we look more closely at asynchronous calls but the next code snippet contains a simple example. In the example below the output written will be sent on a separate thread because of the subscribeOn method. By passing the Scheduler.io() parameter to this method a new thread will spin up to execute the write to output in a ThreadPool.

Flowable.just("Hello world").subscribeOn(Schedulers.io()).subscribe(System.out::println);

5. Conclusion

In this example we explored the basics of processing data and events with RxJava.  We saw how to build a flowable, apply transformations to the items emitted by the flowable and how to subscribe to flowables.  Two areas conducive to working with RxJava are in cases were your UI is processing events or if you need to process asynchronous calls to services. In the next example we will take a deeper dive into asynchronous requests and the benefits of leveraging RxJava.

4. Download the Source Code

Here we demonstrated how to use the basic RxJava operations.

Download
You can download the Eclipse project here: RxJavaExample

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you our best selling eBooks for FREE!

 

1. JPA Mini Book

2. JVM Troubleshooting Guide

3. JUnit Tutorial for Unit Testing

4. Java Annotations Tutorial

5. Java Interview Questions

6. Spring Interview Questions

7. Android UI Design

 

and many more ....

 

Receive Java & Developer job alerts in your Area

 

Leave a Reply

Be the First to Comment!

Notify of
avatar
wpDiscuz