Java 9 Reactive Streams Example
Spurred on by an abundance of non-standardized Reactive Stream implementations for the JVM (Akka streams, RxJava) and an increasing need to handle stream related problems in a reactive manner, JEP 266 was augmented with the need for an interoperable publish-subscribe framework that sought to standardize reactive streams solutions for the JVM.
This way library implementations now have a standard that warrants compliance for interoperability and a common base to derive from, the JVM.
This article will introduce us to the Java 9 Reactive Streams standard, core abstractions and a descriptive example that underscores how the abstractions in the standard cooperate to create reactive stream solutions.
1. Introduction
Typically systems have non-standardized (sometimes none) mechanisms for dealing with volatile load and thus, do not scale / degrade gracefully, in the face of such situations. The Reactive Streams processing initiative seeks to provide a means to deal with such scenarios by standardizing asynchronous stream processing with non-blocking back pressure. Java 9 introduces us to a lean set of interfaces aimed at standardizing reactive stream processing for the JVM, both for new implementations and existing ones.
By back pressure Reactive Streams tries to moderate the stream of data across asynchronous exchanges (in process or remote). To be more specific (Java – in process), exchanging data from one Thread to another needs to be a cooperative mechanism where the consuming component needs to indicate how much data it wants and the producing component needs to reciprocate up to that amount, so as not to overload the consumer. Strategies can then be designed to indicate how producers notify interested parties when the system is under strain and cannot handle any more data or possibly scale the system to meet said demands.
The efforts do not only target run time platforms but also network protocols (particularly for distributed systems). A prime example is TCP which has long since had the facilities for back pressure. Reactive streams is an initiative to extend this into our application domain so as to fluently apply back pressure and have this cascade down into the network stack and ultimately on to other remote systems.
2. Technologies used
The example code in this article was built and run using:
- Java 9 (jdk-9+180)
- Maven 3.3.9 (3.3.x will do fine)
- Eclipse Oxygen (4.7.0)
- Ubuntu 16.04 (Windows, Mac or Linux will do fine)
3. Setup
Regarding the technologies used, all are required except for Eclipse, as viewing the code can be done in any text editor. Compiling and running the program can be done via the following:
- compile:
mvn clean install package
- run: navigate into the
target
folder of the downloaded project root folder and execute the following:java -jar reactive_streams_example-0.0.1-SNAPSHOT.jar
To setup the required technologies one can refer back to a previous article in this series (see section 3. Setup), which details all the steps required to set up your environment.
4. How it works
The following diagram illustrates how the Java 9 Reactive Streams abstractions cooperate to deliver a reactive solution. The image illustrates a non reactive relationship (top) between two components and a reactive / cooperative relationship (bottom) between two components.
On the top we can see that a push of data that is not throttled in terms of stable system throughput can exhaust the Subscribers buffers, whereas below the Subscriber will indicate to the Publisher how much data it wants being an indication of how much it can handle leading to a potentially more stable solution in the face of volatile load. Not shown in the bottom graphic is the details of the mediation between Subscriber and Publisher, which concretely is mediated via a Subscription abstraction, this was intentional to better get the concept across.
Whats important to take away from this is that a Reactive Stream solution can self toggle it’s perceived behavior from push based to pull based as the need arises.
5. The API
The Java 9 Reactive Streams standard defines a set of abstractions which specifies the standard for Reactive Stream processing and to some extent brings utility in implementing Reactive Streams solutions. These abstractions can be found within the module java.base
and the package java.util.concurrent
meaning implementations now, as of Java 9, need no further dependencies to implement Reactive Streams solutions.
There is also a Test compatibility kit available for potential implementations to be tested against, to ensure compliance with the specifications provided by the JDK.
Core to the standard are the set of interfaces contained with the Flow class which resides in above-mentioned module and package.
5.1 Flow.Processor<T,R>
An interface specifying that implementing classes act as both producers and consumers of data in a reactive stream.
- T – the subscribed item type
- R – the published item type
5.2 Flow.Publisher<T>
A Functional interface that produces data for consumption by Subscribers. Stream communication (data, error, completion) with Subscribers is facilitated via the Subscription abstraction. A new Subscriber will subscribe to the Publisher which will create a unique Subscription per Subscriber. The Subscription will serve as the mediation between production and consumption of data.
- T – the published item type
5.3 Flow.Subscriber<T>
An interface specifying the consumption of data, completion indicators and errors. Their is an implied ordering in the invocation of this API, meaning subscription notifications will occur before any item is consumed which occurs chronologically before completion of the stream or of course any errors.
- T – the subscribed item type
5.4 Flow.Subscription
An interface specifying the contract between the Publisher and the Subscriber. Methods on this interface are intended to be invoked only by the Subscriber.
6. The Program Code
The sample program consists of an attempt to regulate the stream flow between 3 participating components, namely QueueWrapper
, AccountValidator
and Mutator
which all contribute to validating an incoming Transaction
and recording the account mutation which can be one of either WITHDRAW
or DEPOSIT
.
I say attempt because getting the stream to regulate itself can be done in so many different ways, the possibilities are endless and can be quite sophisticated. This example attempts to regulate the stream based on downstream capacity of the Subscriber buffers but could also be based on time of day, task latency or other hardware / network metrics etc.
The following diagram depicts the flow:
6.1. QueueWrapper
The QueueWrapper
forms the entry point Publisher and wraps a fake Queue
, but this could be ActiveMQ
or RabbitMQ
etc. in real life. A ScheduledExecutor
service is supplied which periodically polls
the fake Queue
for data based on downstream demand / capacity. The items are then de-queued from the Queue
and submitted to the AccountValidator
by way of the ReceivedSubscription
.
QueueWrapper snippet
public final class QueueWrapper implements Publisher<UnValidatedTransaction> { // Fake backing queue private final Supplier<UnValidatedTransaction> queue; private final ScheduledExecutorService execService; private ReceivedSubscription receivedSubscription; public QueueWrapper(final ScheduledExecutorService execService, final Supplier<UnValidatedTransaction> queue) { Objects.requireNonNull(execService); Objects.requireNonNull(queue); this.queue = queue; this.execService = execService; } // Initialize scheduled Threading to poll the fake queue. public void init() { this.execService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { QueueWrapper.this.receivedSubscription.publish(); } }, Constants.Threading.SCHEDULE_DELAY, Constants.Threading.SCHEDULE_DELAY, TimeUnit.MILLISECONDS); } // Convenience method to shutdown the flow pipeline public void stop() { this.receivedSubscription.cancel(); while (!Thread.currentThread().isInterrupted() && this.receivedSubscription.demand.get() > 0) { try { Thread.sleep(Constants.Threading.AWAIT_TERMINATION); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } @Override public void subscribe(final Subscriber<? super UnValidatedTransaction> subscriber) { // Set the downstream Subscription with the downstream AccountValidator // Processor this.receivedSubscription = new ReceivedSubscription(this.queue, subscriber); // Call back upon subscription with the downstream AccountValidator Processor subscriber.onSubscribe(this.receivedSubscription); } static final class ReceivedSubscription implements Subscription { private final Subscriber<? super UnValidatedTransaction> subscriber; private final Supplier<UnValidatedTransaction> queue; private final AtomicLong demand; private final AtomicBoolean cancelled; private ReceivedSubscription(final Supplier<UnValidatedTransaction> queue, final Subscriber<? super UnValidatedTransaction> subscriber) { this.queue = queue; this.subscriber = subscriber; this.demand = new AtomicLong(); this.cancelled = new AtomicBoolean(false); } @Override public void cancel() { // Upon cancellation set flag to help in request decision making this.cancelled.set(true); // Complete the subscriber AccountValidator Processor this.subscriber.onComplete(); } @Override public void request(final long n) { // Set demand accordingly this.demand.set(this.cancelled.get() ? 0 : n); System.out.printf("Thread %s : Downstream demand is %d\n", Thread.currentThread().getName(), n); } private void publish() { // As long as we have demand poll queue and send items while (this.demand.getAndDecrement() > 0) { final UnValidatedTransaction unvalidatedTransaction = this.queue.get(); // Append only persistence simulated record(unvalidatedTransaction); this.subscriber.onNext(unvalidatedTransaction); } } private void record(final Transaction unvalidatedTransaction) { assert !Objects.isNull(unvalidatedTransaction); System.out.printf("Thread %s : %s\n", Thread.currentThread().getName(), unvalidatedTransaction); } } }
6.2. AccountValidator
The AccountValidator
is a Processor that simulates a high latency validation
call to validate the incoming Transaction
. Once validated the Transaction
is submitted to the Mutator
Subscriber via the ValidatedSubscription
interface which will then record the mutation.
AccountValidator snippet
public final class AccountValidator implements Processor<UnValidatedTransaction, ValidatedTransaction> { private final ExecutorService execService; // Buffer to store items to process private final Queue<ValidatedTransaction> buffer; private Subscription receivedSubscription; private ValidatedSubscription validatedSubscription; public AccountValidator(final ExecutorService execService) { this.execService = execService; this.buffer = new ArrayBlockingQueue<>(Constants.SUBSCRIBER_BUFFER_SIZE); } @Override public void onComplete() { // On completion cancel the downstream subscription with the Mutator Subscriber this.validatedSubscription.cancel(); } @Override public void onError(final Throwable throwable) { throwable.printStackTrace(); // On error cancel the downstream subscription with the Mutator Subscriber this.validatedSubscription.cancel(); } @Override public void onNext(final UnValidatedTransaction unvalidatedTransaction) { // For each new item from our upstream QueueWrapper Publisher this.validatedSubscription.publish(unvalidatedTransaction); } @Override public void onSubscribe(final Subscription receivedSubscription) { // Upon subscription set the subscription with the upstream QueueWrapper // Publisher this.receivedSubscription = receivedSubscription; // Request items that we have capacity for from the upstream QueueWrapper // Publisher this.receivedSubscription.request(Constants.SUBSCRIBER_BUFFER_SIZE); } @Override public void subscribe(final Subscriber<? super ValidatedTransaction> subscriber) { // Create new downstream subscription from subscription request from Mutator // Subscriber this.validatedSubscription = new ValidatedSubscription(this, subscriber); // Call back into the Mutator upon subscription subscriber.onSubscribe(this.validatedSubscription); } // Fake append only persistence for dummy event log private void record(final ValidatedTransaction validatedTransaction) { assert Objects.isNull(validatedTransaction); System.out.printf("Thread %s : %s\n", Thread.currentThread().getName(), validatedTransaction); } // Downstream Subscription with Mutator Subscriber private static final class ValidatedSubscription implements Subscription { private final AccountValidator accountValidator; private final Subscriber<? super ValidatedTransaction> subscriber; private final ExecutorService execService; private final Queue<ValidatedTransaction> buffer; private final AtomicLong demand; private final AtomicBoolean cancelled; private ValidatedSubscription(final AccountValidator accountValidator, final Subscriber<? super ValidatedTransaction> subscriber) { this.subscriber = subscriber; this.execService = accountValidator.execService; this.buffer = accountValidator.buffer; this.accountValidator = accountValidator; this.demand = new AtomicLong(); this.cancelled = new AtomicBoolean(false); } @Override public void cancel() { // Indicate this subscription is cancelled and call onComplete of downstream // Mutator Subscriber this.cancelled.set(true); this.subscriber.onComplete(); } @Override public void request(final long n) { // Set demand of downstream Mutator Subscriber accordingly this.demand.set(this.cancelled.get() ? 0 : n); // Execute asynchronously work to be down for sending transactions to Mutator // Subscriber this.execService.execute(() -> { // If their is demand and our buffer has items them empty the buffer until // demand is met or buffer is drained. while (this.demand.get() > 0 && !this.buffer.isEmpty()) { this.demand.decrementAndGet(); final ValidatedTransaction validatedTransaction = this.buffer.poll(); ValidatedSubscription.this.subscriber.onNext(validatedTransaction); } System.out.printf("Thread %s : AccountValidator requesting %d : Buffer size %d\n", Thread.currentThread().getName(), (Constants.SUBSCRIBER_BUFFER_SIZE - this.buffer.size()), this.buffer.size()); this.accountValidator.receivedSubscription.request(Constants.SUBSCRIBER_BUFFER_SIZE - this.buffer.size()); }); } private void publish(final UnValidatedTransaction unvalidatedTransaction) { assert !Objects.isNull(unvalidatedTransaction); // Execute asynchronously validation mapping this.execService.execute(() -> { // Simulate high latency transaction validation call try { Thread.sleep(1000l); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } final ValidatedTransaction validatedTransaction = new ValidatedTransaction(unvalidatedTransaction.getAccount(), unvalidatedTransaction.getAmount(), unvalidatedTransaction.getMutation()); this.accountValidator.record(validatedTransaction); // If we have immediate demand then dispatch to downstream Mutator Subscriber // otherwise store in our buffer until demand if (this.demand.getAndDecrement() > 0) { ValidatedSubscription.this.subscriber.onNext(validatedTransaction); } else { this.buffer.offer(validatedTransaction); } }); } } }
6.3. Mutator
The Mutator
is a Subscriber which subscribes to ValidatedTransaction
data from the AccountValidator
and persists the entry into a fake append-only event log.
Mutator snippet
public final class Mutator implements Subscriber<ValidatedTransaction> { private final AtomicInteger counter; private final ExecutorService execService; private Subscription subscription; private AtomicBoolean cancelled; public Mutator(final ExecutorService execService) { this.counter = new AtomicInteger(); this.execService = execService; this.cancelled = new AtomicBoolean(); } @Override public void onComplete() { // On completion set cancelled flag to aid in request decisions from upstream // AccountValidator Publisher this.cancelled.set(true); } @Override public void onError(final Throwable throwable) { throwable.printStackTrace(); this.subscription.cancel(); } @Override public void onNext(final ValidatedTransaction validatedTransaction) { // Execute asynchronously any append only mutation entries and requests for // additional items this.execService.execute(() -> { record(new MutatedTransaction(validatedTransaction.getAccount(), validatedTransaction.getAmount(), validatedTransaction.getMutation())); this.counter.incrementAndGet(); if (this.counter.get() > (Constants.SUBSCRIBER_BUFFER_SIZE - 1)) { this.counter.set(0); request(); } }); } private void request() { System.out.printf("Thread %s : Mutator requesting %d\n", Thread.currentThread().getName(), Constants.SUBSCRIBER_BUFFER_SIZE); // Request more items from AccountValidator Processor provided we are not // cancelled this.subscription.request(this.cancelled.get() ? 0 : Constants.SUBSCRIBER_BUFFER_SIZE); } @Override public void onSubscribe(final Subscription subscription) { // Set Subscription with AccountValidator Processor this.subscription = subscription; // Request more items from AccountValidator Publisher provided we are not // cancelled this.subscription.request(this.cancelled.get() ? 0 : Constants.SUBSCRIBER_BUFFER_SIZE); } // Simulate append only persistence for mutation private void record(final MutatedTransaction mutatedTransaction) { assert Objects.isNull(mutatedTransaction); System.out.printf("Thread %s : %s\n", Thread.currentThread().getName(), mutatedTransaction); } }
6.4 Running the program
You can run the program by navigating to the project root folder of the download and executing the following:
- Build:
mvn clean install package
- Run: navigate to the
target
folder with the project root folder and execute the following:
Sample output from running the program on my machine.
Sample output
Thread pool-2-thread-2 : Mutator requesting 5 Thread pool-2-thread-2 : AccountValidator requesting 5 : Buffer size 0 Thread pool-2-thread-2 : Downstream demand is 5 Thread pool-3-thread-1 : Account: db9e0e6b-138c-4858-addb-802c3fea211e Amount: 40 Status: RECEIVED Mutation: DEPOSIT Thread pool-3-thread-1 : Account: b6a1c583-890a-4465-ac82-30740890e459 Amount: 71 Status: RECEIVED Mutation: WITHDRAW Thread pool-3-thread-1 : Account: 842afc2d-28b5-4280-a90d-c0ddd76a2272 Amount: 31 Status: RECEIVED Mutation: WITHDRAW Thread pool-3-thread-1 : Account: 39224803-f033-444c-8809-6d9cb96b6a34 Amount: 86 Status: RECEIVED Mutation: DEPOSIT Thread pool-3-thread-1 : Account: 7992b23a-8e34-451c-8c15-94daa232f330 Amount: 99 Status: RECEIVED Mutation: WITHDRAW Thread pool-2-thread-1 : Account: db9e0e6b-138c-4858-addb-802c3fea211e Amount: 40 Status: VALIDATED Mutation: DEPOSIT Thread pool-2-thread-2 : Account: b6a1c583-890a-4465-ac82-30740890e459 Amount: 71 Status: VALIDATED Mutation: WITHDRAW Thread pool-2-thread-1 : Account: 842afc2d-28b5-4280-a90d-c0ddd76a2272 Amount: 31 Status: VALIDATED Mutation: WITHDRAW Thread pool-2-thread-2 : Account: 39224803-f033-444c-8809-6d9cb96b6a34 Amount: 86 Status: VALIDATED Mutation: DEPOSIT Thread pool-2-thread-2 : Account: db9e0e6b-138c-4858-addb-802c3fea211e Amount: 40 Status: MUTATED Mutation: DEPOSIT Thread pool-2-thread-2 : Account: b6a1c583-890a-4465-ac82-30740890e459 Amount: 71 Status: MUTATED Mutation: WITHDRAW Thread pool-2-thread-2 : Account: 842afc2d-28b5-4280-a90d-c0ddd76a2272 Amount: 31 Status: MUTATED Mutation: WITHDRAW Thread pool-2-thread-2 : Account: 39224803-f033-444c-8809-6d9cb96b6a34 Amount: 86 Status: MUTATED Mutation: DEPOSIT Thread pool-2-thread-1 : Account: 7992b23a-8e34-451c-8c15-94daa232f330 Amount: 99 Status: VALIDATED Mutation: WITHDRAW Thread pool-2-thread-1 : Account: 7992b23a-8e34-451c-8c15-94daa232f330 Amount: 99 Status: MUTATED Mutation: WITHDRAW Thread pool-2-thread-1 : Mutator requesting 5 Thread pool-2-thread-1 : AccountValidator requesting 5 : Buffer size 0 Thread pool-2-thread-1 : Downstream demand is 5 Thread pool-3-thread-1 : Account: 91798e93-4d39-46e3-a660-0044f12aa8e0 Amount: 17 Status: RECEIVED Mutation: DEPOSIT Thread pool-3-thread-1 : Account: 6a5e4873-672d-4f96-8b00-c0d5c0d627bb Amount: 88 Status: RECEIVED Mutation: DEPOSIT Thread pool-3-thread-1 : Account: e524e8bf-493d-4bb1-989f-5f8cee30727f Amount: 73 Status: RECEIVED Mutation: DEPOSIT Thread pool-3-thread-1 : Account: 49adab7f-cfeb-40d5-97e6-b7ceb99017bf Amount: 59 Status: RECEIVED Mutation: DEPOSIT Thread pool-3-thread-1 : Account: 912c05de-95fe-48ba-a4b6-0903a0253975 Amount: 97 Status: RECEIVED Mutation: WITHDRAW Thread pool-2-thread-2 : Account: 91798e93-4d39-46e3-a660-0044f12aa8e0 Amount: 17 Status: VALIDATED Mutation: DEPOSIT Thread pool-2-thread-1 : Account: 6a5e4873-672d-4f96-8b00-c0d5c0d627bb Amount: 88 Status: VALIDATED Mutation: DEPOSIT Thread pool-2-thread-2 : Account: e524e8bf-493d-4bb1-989f-5f8cee30727f Amount: 73 Status: VALIDATED Mutation: DEPOSIT Thread pool-2-thread-1 : Account: 49adab7f-cfeb-40d5-97e6-b7ceb99017bf Amount: 59 Status: VALIDATED Mutation: DEPOSIT Thread pool-2-thread-1 : Account: 91798e93-4d39-46e3-a660-0044f12aa8e0 Amount: 17 Status: MUTATED Mutation: DEPOSIT Thread pool-2-thread-1 : Account: 6a5e4873-672d-4f96-8b00-c0d5c0d627bb Amount: 88 Status: MUTATED Mutation: DEPOSIT Thread pool-2-thread-1 : Account: e524e8bf-493d-4bb1-989f-5f8cee30727f Amount: 73 Status: MUTATED Mutation: DEPOSIT Thread pool-2-thread-1 : Account: 49adab7f-cfeb-40d5-97e6-b7ceb99017bf Amount: 59 Status: MUTATED Mutation: DEPOSIT Thread pool-2-thread-2 : Account: 912c05de-95fe-48ba-a4b6-0903a0253975 Amount: 97 Status: VALIDATED Mutation: WITHDRAW Thread pool-2-thread-2 : Account: 912c05de-95fe-48ba-a4b6-0903a0253975 Amount: 97 Status: MUTATED Mutation: WITHDRAW
7. Summary
In this example tutorial we briefly covered the origins of the Reactive Streams standard, how it works and briefly covered the API introduced in Java 9 as a means to standardize Reactive Stream processing. We also provided a descriptive example that leveraged the standard to demonstrate some of the core abstractions in the Java 9 API.
What I have found doing this exercise is that abstractions on hand are very generic and intentionally so, the level of abstraction would need some lifting for it to be more intuitive for specific domains. Also conventional methods of A.C.I.D. in Reactive Streams, particularly by way of asynchronous exchanges do not work and new methods of persistence / consistency need to be implemented when using designs like this.
In the example code I tried to implement a form of primitive (example) append only event sourcing, where each component (QueueWrapper
, AccountValidator
and Mutator
) each stores their transactions as a sequence of events in some log (simulated by simply writing to console).
8. Download the Source Code
This was a Java 9 Reactive Streams Example.
You can download the full source code of this example here: Java 9 Reactive Streams Example
Why do you invoke ‘mvn clean install package’ when ‘install’ actually requires ‘package’ to be performed?