Home » Core Java » Java 9 Reactive Streams Example

About JJ

Jean-Jay Vester graduated from the Cape Peninsula University of Technology, Cape Town, in 2001 and has spent most of his career developing Java backend systems for small to large sized companies both sides of the equator. He has an abundance of experience and knowledge in many varied Java frameworks and has also acquired some systems knowledge along the way. Recently he has started developing his JavaScript skill set specifically targeting Angularjs and also bridged that skill to the backend with Nodejs.

Java 9 Reactive Streams Example

Spurred on by an abundance of non-standardized Reactive Stream implementations for the JVM (Akka streams, RxJava) and an increasing need to handle stream related problems in a reactive manner, JEP 266 was augmented with the need for an interoperable publish-subscribe framework that sought to standardize reactive streams solutions for the JVM.

This way library implementations now have a standard that warrants compliance for interoperability and a common base to derive from, the JVM.

This article will introduce us to the Java 9 Reactive Streams standard, core abstractions and a descriptive example that underscores how the abstractions in the standard cooperate to create reactive stream solutions.

1. Introduction

Typically systems have non-standardized (sometimes none) mechanisms for dealing with volatile load and thus, do not scale / degrade gracefully, in the face of such situations. The Reactive Streams processing initiative seeks to provide a means to deal with such scenarios by standardizing asynchronous stream processing with non-blocking back pressure. Java 9 introduces us to a lean set of interfaces aimed at standardizing reactive stream processing for the JVM, both for new implementations and existing ones.

By back pressure Reactive Streams tries to moderate the stream of data across asynchronous exchanges (in process or remote). To be more specific (Java – in process), exchanging data from one Thread to another needs to be a cooperative mechanism where the consuming component needs to indicate how much data it wants and the producing component needs to reciprocate up to that amount, so as not to overload the consumer. Strategies can then be designed to indicate how producers notify interested parties when the system is under strain and cannot handle any more data or possibly scale the system to meet said demands.

The efforts do not only target run time platforms but also network protocols (particularly for distributed systems). A prime example is TCP which has long since had the facilities for back pressure. Reactive streams is an initiative to extend this into our application domain so as to fluently apply back pressure and have this cascade down into the network stack and ultimately on to other remote systems.

2. Technologies used

The example code in this article was built and run using:

  • Java 9 (jdk-9+180)
  • Maven 3.3.9 (3.3.x will do fine)
  • Eclipse Oxygen (4.7.0)
  • Ubuntu 16.04 (Windows, Mac or Linux will do fine)

3. Setup

Regarding the technologies used, all are required except for Eclipse, as viewing the code can be done in any text editor. Compiling and running the program can be done via the following:

  • compile: mvn clean install package
  • run: navigate into the target folder of the downloaded project root folder and execute the following:  java -jar reactive_streams_example-0.0.1-SNAPSHOT.jar

To setup the required technologies one can refer back to a previous article in this series (see section 3. Setup), which details all the steps required to set up your environment.

4. How it works

The following diagram illustrates how the Java 9 Reactive Streams abstractions cooperate to deliver a reactive solution. The image illustrates a non reactive relationship (top) between two components and a reactive / cooperative relationship (bottom) between two components.

Non reactive and reactive relationship between components

On the top we can see that a push of data that is not throttled in terms of stable system throughput can exhaust the Subscribers buffers, whereas below the Subscriber will indicate to the Publisher how much data it wants being an indication of how much it can handle leading to a potentially more stable solution in the face of volatile load. Not shown in the bottom graphic is the details of the mediation between Subscriber and Publisher, which concretely is mediated via a Subscription abstraction, this was intentional to better get the concept across.

Whats important to take away from this is that a Reactive Stream solution can self toggle it’s perceived behavior from push based to pull based as the need arises.

5. The API

The Java 9 Reactive Streams standard defines a set of abstractions which specifies the standard for Reactive Stream processing and to some extent brings utility in implementing Reactive Streams solutions. These abstractions can be found within the module java.base and the package java.util.concurrent meaning implementations now, as of Java 9, need no further dependencies to implement Reactive Streams solutions.

There is also a Test compatibility kit available for potential implementations to be tested against, to ensure compliance with the specifications provided by the JDK.

Core to the standard are the set of interfaces contained with the Flow class which resides in above-mentioned module and package.

5.1 Flow.Processor<T,R>

An interface specifying that implementing classes act as both producers and consumers of data in a reactive stream.

  • T – the subscribed item type
  • R – the published item type

5.2 Flow.Publisher<T>

A Functional interface that produces data for consumption by Subscribers. Stream communication (data, error, completion) with Subscribers is facilitated via the Subscription abstraction. A new Subscriber will subscribe to the Publisher which will create a unique Subscription per Subscriber. The Subscription will serve as the mediation between production and consumption of data.

  • T – the published item type

5.3 Flow.Subscriber<T>

An interface specifying the consumption of data, completion indicators and errors. Their is an implied ordering in the invocation of this API, meaning subscription notifications will occur before any item is consumed which occurs chronologically before completion of the stream or of course any errors.

  • T – the subscribed item type

5.4 Flow.Subscription

An interface specifying the contract between the Publisher and the Subscriber. Methods on this interface are intended to be invoked only by the Subscriber.

6. The Program Code

The sample program consists of an attempt to regulate the stream flow between 3 participating components, namely QueueWrapper, AccountValidator and Mutator which all contribute to validating an incoming Transaction and recording the account mutation which can be one of either WITHDRAW or DEPOSIT.

I say attempt because getting the stream to regulate itself can be done in so many different ways, the possibilities are endless and can be quite sophisticated. This example attempts to regulate the stream based on downstream capacity of the Subscriber buffers but could also be based on time of day, task latency or other hardware / network metrics etc.

The following diagram depicts the flow:

Reactive Stream Flow

6.1. QueueWrapper

The QueueWrapper forms the entry point Publisher and wraps a fake Queue, but this could be ActiveMQ or RabbitMQ etc. in real life. A ScheduledExecutor service is supplied which periodically polls the fake Queue for data based on downstream demand / capacity. The items are then de-queued from the Queue and submitted to the AccountValidator by way of the ReceivedSubscription.

QueueWrapper snippet

public final class QueueWrapper implements Publisher<UnValidatedTransaction> {

    // Fake backing queue
    private final Supplier<UnValidatedTransaction> queue;
    private final ScheduledExecutorService execService;

    private ReceivedSubscription receivedSubscription;

    public QueueWrapper(final ScheduledExecutorService execService, final Supplier<UnValidatedTransaction> queue) {
        Objects.requireNonNull(execService);
        Objects.requireNonNull(queue);

        this.queue = queue;
        this.execService = execService;
    }

    // Initialize scheduled Threading to poll the fake queue.
    public void init() {
        this.execService.scheduleWithFixedDelay(new Runnable() {

            @Override
            public void run() {
                QueueWrapper.this.receivedSubscription.publish();
            }
        }, Constants.Threading.SCHEDULE_DELAY, Constants.Threading.SCHEDULE_DELAY, TimeUnit.MILLISECONDS);
    }

    // Convenience method to shutdown the flow pipeline
    public void stop() {
        this.receivedSubscription.cancel();
        while (!Thread.currentThread().isInterrupted() && this.receivedSubscription.demand.get() > 0) {
            try {
                Thread.sleep(Constants.Threading.AWAIT_TERMINATION);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void subscribe(final Subscriber<? super UnValidatedTransaction> subscriber) {
        // Set the downstream Subscription with the downstream AccountValidator
        // Processor
        this.receivedSubscription = new ReceivedSubscription(this.queue, subscriber);
        // Call back upon subscription with the downstream AccountValidator Processor
        subscriber.onSubscribe(this.receivedSubscription);
    }

    static final class ReceivedSubscription implements Subscription {

        private final Subscriber<? super UnValidatedTransaction> subscriber;
        private final Supplier<UnValidatedTransaction> queue;
        private final AtomicLong demand;
        private final AtomicBoolean cancelled;

        private ReceivedSubscription(final Supplier<UnValidatedTransaction> queue, final Subscriber<? super UnValidatedTransaction> subscriber) {
            this.queue = queue;
            this.subscriber = subscriber;
            this.demand = new AtomicLong();
            this.cancelled = new AtomicBoolean(false);
        }

        @Override
        public void cancel() {
            // Upon cancellation set flag to help in request decision making
            this.cancelled.set(true);
            // Complete the subscriber AccountValidator Processor
            this.subscriber.onComplete();
        }

        @Override
        public void request(final long n) {
            // Set demand accordingly
            this.demand.set(this.cancelled.get() ? 0 : n);

            System.out.printf("Thread %s : Downstream demand is %d\n", Thread.currentThread().getName(), n);
        }

        private void publish() {
            // As long as we have demand poll queue and send items
            while (this.demand.getAndDecrement() > 0) {
                final UnValidatedTransaction unvalidatedTransaction = this.queue.get();

                // Append only persistence simulated
                record(unvalidatedTransaction);

                this.subscriber.onNext(unvalidatedTransaction);
            }
        }

        private void record(final Transaction unvalidatedTransaction) {
            assert !Objects.isNull(unvalidatedTransaction);

            System.out.printf("Thread %s : %s\n", Thread.currentThread().getName(), unvalidatedTransaction);
        }
    }
}

6.2. AccountValidator

The AccountValidator is a Processor that simulates a high latency validation call to validate the incoming Transaction. Once validated the Transaction is submitted to the Mutator Subscriber via the ValidatedSubscription interface which will then record the mutation.

AccountValidator snippet

public final class AccountValidator implements Processor<UnValidatedTransaction, ValidatedTransaction> {

    private final ExecutorService execService;
    // Buffer to store items to process
    private final Queue<ValidatedTransaction> buffer;

    private Subscription receivedSubscription;
    private ValidatedSubscription validatedSubscription;

    public AccountValidator(final ExecutorService execService) {
        this.execService = execService;
        this.buffer = new ArrayBlockingQueue<>(Constants.SUBSCRIBER_BUFFER_SIZE);
    }

    @Override
    public void onComplete() {
        // On completion cancel the downstream subscription with the Mutator Subscriber
        this.validatedSubscription.cancel();
    }

    @Override
    public void onError(final Throwable throwable) {
        throwable.printStackTrace();
        // On error cancel the downstream subscription with the Mutator Subscriber
        this.validatedSubscription.cancel();
    }

    @Override
    public void onNext(final UnValidatedTransaction unvalidatedTransaction) {
        // For each new item from our upstream QueueWrapper Publisher
        this.validatedSubscription.publish(unvalidatedTransaction);
    }

    @Override
    public void onSubscribe(final Subscription receivedSubscription) {
        // Upon subscription set the subscription with the upstream QueueWrapper
        // Publisher
        this.receivedSubscription = receivedSubscription;
        // Request items that we have capacity for from the upstream QueueWrapper
        // Publisher
        this.receivedSubscription.request(Constants.SUBSCRIBER_BUFFER_SIZE);
    }

    @Override
    public void subscribe(final Subscriber<? super ValidatedTransaction> subscriber) {
        // Create new downstream subscription from subscription request from Mutator
        // Subscriber
        this.validatedSubscription = new ValidatedSubscription(this, subscriber);
        // Call back into the Mutator upon subscription
        subscriber.onSubscribe(this.validatedSubscription);
    }

    // Fake append only persistence for dummy event log
    private void record(final ValidatedTransaction validatedTransaction) {
        assert Objects.isNull(validatedTransaction);

        System.out.printf("Thread %s : %s\n", Thread.currentThread().getName(), validatedTransaction);
    }

    // Downstream Subscription with Mutator Subscriber
    private static final class ValidatedSubscription implements Subscription {

        private final AccountValidator accountValidator;
        private final Subscriber<? super ValidatedTransaction> subscriber;
        private final ExecutorService execService;
        private final Queue<ValidatedTransaction> buffer;
        private final AtomicLong demand;
        private final AtomicBoolean cancelled;

        private ValidatedSubscription(final AccountValidator accountValidator, final Subscriber<? super ValidatedTransaction> subscriber) {
            this.subscriber = subscriber;
            this.execService = accountValidator.execService;
            this.buffer = accountValidator.buffer;
            this.accountValidator = accountValidator;
            this.demand = new AtomicLong();
            this.cancelled = new AtomicBoolean(false);
        }

        @Override
        public void cancel() {
            // Indicate this subscription is cancelled and call onComplete of downstream
            // Mutator Subscriber
            this.cancelled.set(true);
            this.subscriber.onComplete();
        }

        @Override
        public void request(final long n) {

            // Set demand of downstream Mutator Subscriber accordingly
            this.demand.set(this.cancelled.get() ? 0 : n);

            // Execute asynchronously work to be down for sending transactions to Mutator
            // Subscriber
            this.execService.execute(() -> {
                // If their is demand and our buffer has items them empty the buffer until
                // demand is met or buffer is drained.
                while (this.demand.get() > 0 && !this.buffer.isEmpty()) {
                    this.demand.decrementAndGet();

                    final ValidatedTransaction validatedTransaction = this.buffer.poll();
                    ValidatedSubscription.this.subscriber.onNext(validatedTransaction);
                }

                System.out.printf("Thread %s : AccountValidator requesting %d : Buffer size %d\n", Thread.currentThread().getName(),
                        (Constants.SUBSCRIBER_BUFFER_SIZE - this.buffer.size()), this.buffer.size());

                this.accountValidator.receivedSubscription.request(Constants.SUBSCRIBER_BUFFER_SIZE - this.buffer.size());
            });
        }

        private void publish(final UnValidatedTransaction unvalidatedTransaction) {
            assert !Objects.isNull(unvalidatedTransaction);

            // Execute asynchronously validation mapping
            this.execService.execute(() -> {
                // Simulate high latency transaction validation call
                try {
                    Thread.sleep(1000l);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }

                final ValidatedTransaction validatedTransaction = new ValidatedTransaction(unvalidatedTransaction.getAccount(),
                        unvalidatedTransaction.getAmount(), unvalidatedTransaction.getMutation());

                this.accountValidator.record(validatedTransaction);

                // If we have immediate demand then dispatch to downstream Mutator Subscriber
                // otherwise store in our buffer until demand
                if (this.demand.getAndDecrement() > 0) {
                    ValidatedSubscription.this.subscriber.onNext(validatedTransaction);
                } else {
                    this.buffer.offer(validatedTransaction);
                }
            });
        }
    }
}

6.3. Mutator

The Mutator is a Subscriber which subscribes to ValidatedTransaction data from the AccountValidator and persists the entry into a fake append-only event log.

Mutator snippet

public final class Mutator implements Subscriber<ValidatedTransaction> {

    private final AtomicInteger counter;
    private final ExecutorService execService;
    private Subscription subscription;
    private AtomicBoolean cancelled;

    public Mutator(final ExecutorService execService) {
        this.counter = new AtomicInteger();
        this.execService = execService;
        this.cancelled = new AtomicBoolean();
    }

    @Override
    public void onComplete() {
        // On completion set cancelled flag to aid in request decisions from upstream
        // AccountValidator Publisher
        this.cancelled.set(true);
    }

    @Override
    public void onError(final Throwable throwable) {
        throwable.printStackTrace();
        this.subscription.cancel();
    }

    @Override
    public void onNext(final ValidatedTransaction validatedTransaction) {
        // Execute asynchronously any append only mutation entries and requests for
        // additional items
        this.execService.execute(() -> {
            record(new MutatedTransaction(validatedTransaction.getAccount(), validatedTransaction.getAmount(), validatedTransaction.getMutation()));

            this.counter.incrementAndGet();
            if (this.counter.get() > (Constants.SUBSCRIBER_BUFFER_SIZE - 1)) {
                this.counter.set(0);
                request();
            }
        });
    }

    private void request() {
        System.out.printf("Thread %s : Mutator requesting %d\n", Thread.currentThread().getName(), Constants.SUBSCRIBER_BUFFER_SIZE);

        // Request more items from AccountValidator Processor provided we are not
        // cancelled
        this.subscription.request(this.cancelled.get() ? 0 : Constants.SUBSCRIBER_BUFFER_SIZE);
    }

    @Override
    public void onSubscribe(final Subscription subscription) {
        // Set Subscription with AccountValidator Processor
        this.subscription = subscription;

        // Request more items from AccountValidator Publisher provided we are not
        // cancelled
        this.subscription.request(this.cancelled.get() ? 0 : Constants.SUBSCRIBER_BUFFER_SIZE);
    }

    // Simulate append only persistence for mutation
    private void record(final MutatedTransaction mutatedTransaction) {
        assert Objects.isNull(mutatedTransaction);

        System.out.printf("Thread %s : %s\n", Thread.currentThread().getName(), mutatedTransaction);
    }
}

6.4 Running the program

You can run the program by navigating to the project root folder of the download and executing the following:

  • Build: mvn clean install package
  • Run: navigate to the target folder with the project root folder and execute the following:

Sample output from running the program on my machine.

Sample output

Thread pool-2-thread-2 : Mutator requesting 5
Thread pool-2-thread-2 : AccountValidator requesting 5 : Buffer size 0
Thread pool-2-thread-2 : Downstream demand is 5
Thread pool-3-thread-1 : Account: db9e0e6b-138c-4858-addb-802c3fea211e Amount: 40 Status: RECEIVED Mutation: DEPOSIT
Thread pool-3-thread-1 : Account: b6a1c583-890a-4465-ac82-30740890e459 Amount: 71 Status: RECEIVED Mutation: WITHDRAW
Thread pool-3-thread-1 : Account: 842afc2d-28b5-4280-a90d-c0ddd76a2272 Amount: 31 Status: RECEIVED Mutation: WITHDRAW
Thread pool-3-thread-1 : Account: 39224803-f033-444c-8809-6d9cb96b6a34 Amount: 86 Status: RECEIVED Mutation: DEPOSIT
Thread pool-3-thread-1 : Account: 7992b23a-8e34-451c-8c15-94daa232f330 Amount: 99 Status: RECEIVED Mutation: WITHDRAW
Thread pool-2-thread-1 : Account: db9e0e6b-138c-4858-addb-802c3fea211e Amount: 40 Status: VALIDATED Mutation: DEPOSIT
Thread pool-2-thread-2 : Account: b6a1c583-890a-4465-ac82-30740890e459 Amount: 71 Status: VALIDATED Mutation: WITHDRAW
Thread pool-2-thread-1 : Account: 842afc2d-28b5-4280-a90d-c0ddd76a2272 Amount: 31 Status: VALIDATED Mutation: WITHDRAW
Thread pool-2-thread-2 : Account: 39224803-f033-444c-8809-6d9cb96b6a34 Amount: 86 Status: VALIDATED Mutation: DEPOSIT
Thread pool-2-thread-2 : Account: db9e0e6b-138c-4858-addb-802c3fea211e Amount: 40 Status: MUTATED Mutation: DEPOSIT
Thread pool-2-thread-2 : Account: b6a1c583-890a-4465-ac82-30740890e459 Amount: 71 Status: MUTATED Mutation: WITHDRAW
Thread pool-2-thread-2 : Account: 842afc2d-28b5-4280-a90d-c0ddd76a2272 Amount: 31 Status: MUTATED Mutation: WITHDRAW
Thread pool-2-thread-2 : Account: 39224803-f033-444c-8809-6d9cb96b6a34 Amount: 86 Status: MUTATED Mutation: DEPOSIT
Thread pool-2-thread-1 : Account: 7992b23a-8e34-451c-8c15-94daa232f330 Amount: 99 Status: VALIDATED Mutation: WITHDRAW
Thread pool-2-thread-1 : Account: 7992b23a-8e34-451c-8c15-94daa232f330 Amount: 99 Status: MUTATED Mutation: WITHDRAW
Thread pool-2-thread-1 : Mutator requesting 5
Thread pool-2-thread-1 : AccountValidator requesting 5 : Buffer size 0
Thread pool-2-thread-1 : Downstream demand is 5
Thread pool-3-thread-1 : Account: 91798e93-4d39-46e3-a660-0044f12aa8e0 Amount: 17 Status: RECEIVED Mutation: DEPOSIT
Thread pool-3-thread-1 : Account: 6a5e4873-672d-4f96-8b00-c0d5c0d627bb Amount: 88 Status: RECEIVED Mutation: DEPOSIT
Thread pool-3-thread-1 : Account: e524e8bf-493d-4bb1-989f-5f8cee30727f Amount: 73 Status: RECEIVED Mutation: DEPOSIT
Thread pool-3-thread-1 : Account: 49adab7f-cfeb-40d5-97e6-b7ceb99017bf Amount: 59 Status: RECEIVED Mutation: DEPOSIT
Thread pool-3-thread-1 : Account: 912c05de-95fe-48ba-a4b6-0903a0253975 Amount: 97 Status: RECEIVED Mutation: WITHDRAW
Thread pool-2-thread-2 : Account: 91798e93-4d39-46e3-a660-0044f12aa8e0 Amount: 17 Status: VALIDATED Mutation: DEPOSIT
Thread pool-2-thread-1 : Account: 6a5e4873-672d-4f96-8b00-c0d5c0d627bb Amount: 88 Status: VALIDATED Mutation: DEPOSIT
Thread pool-2-thread-2 : Account: e524e8bf-493d-4bb1-989f-5f8cee30727f Amount: 73 Status: VALIDATED Mutation: DEPOSIT
Thread pool-2-thread-1 : Account: 49adab7f-cfeb-40d5-97e6-b7ceb99017bf Amount: 59 Status: VALIDATED Mutation: DEPOSIT
Thread pool-2-thread-1 : Account: 91798e93-4d39-46e3-a660-0044f12aa8e0 Amount: 17 Status: MUTATED Mutation: DEPOSIT
Thread pool-2-thread-1 : Account: 6a5e4873-672d-4f96-8b00-c0d5c0d627bb Amount: 88 Status: MUTATED Mutation: DEPOSIT
Thread pool-2-thread-1 : Account: e524e8bf-493d-4bb1-989f-5f8cee30727f Amount: 73 Status: MUTATED Mutation: DEPOSIT
Thread pool-2-thread-1 : Account: 49adab7f-cfeb-40d5-97e6-b7ceb99017bf Amount: 59 Status: MUTATED Mutation: DEPOSIT
Thread pool-2-thread-2 : Account: 912c05de-95fe-48ba-a4b6-0903a0253975 Amount: 97 Status: VALIDATED Mutation: WITHDRAW
Thread pool-2-thread-2 : Account: 912c05de-95fe-48ba-a4b6-0903a0253975 Amount: 97 Status: MUTATED Mutation: WITHDRAW

7. Summary

In this example tutorial we briefly covered the origins of the Reactive Streams standard, how it works and briefly covered the API introduced in Java 9 as a means to standardize Reactive Stream processing. We also provided a descriptive example that leveraged the standard to demonstrate some of the core abstractions in the Java 9 API.

What I have found doing this exercise is that abstractions on hand are very generic and intentionally so, the level of abstraction would need some lifting for it to be more intuitive for specific domains. Also conventional methods of A.C.I.D. in Reactive Streams, particularly by way of asynchronous exchanges do not work and new methods of persistence / consistency need to be implemented when using designs like this.

In the example code I tried to implement a form of primitive (example) append only event sourcing, where each component (QueueWrapper, AccountValidator and Mutator) each stores their transactions as a sequence of events in some log (simulated by simply writing to console).

8. Download the Source Code

This was a Java 9 Reactive Streams Example.

Download
You can download the full source code of this example here: Java 9 Reactive Streams Example
(No Ratings Yet)
1 Comment Views Tweet it!

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you our best selling eBooks for FREE!

 

1. JPA Mini Book

2. JVM Troubleshooting Guide

3. JUnit Tutorial for Unit Testing

4. Java Annotations Tutorial

5. Java Interview Questions

6. Spring Interview Questions

7. Android UI Design

 

and many more ....

 

Receive Java & Developer job alerts in your Area

 

1
Leave a Reply

avatar
1 Comment threads
0 Thread replies
0 Followers
 
Most reacted comment
Hottest comment thread
1 Comment authors
Mr P. Recent comment authors
  Subscribe  
newest oldest most voted
Notify of
Mr P.
Guest
Mr P.

Why do you invoke ‘mvn clean install package’ when ‘install’ actually requires ‘package’ to be performed?