Core Java

Comparing Java Future, CompletableFuture and Rxjava Observable

Future, CompletableFuture, and RxJava observable are mechanisms for handling asynchronous and reactive programming in Java, but they have different characteristics and use cases. In this article, we will explore the key differences between these three constructs and provide code examples to illustrate the differences between them.

1. Observable from RxJava

RxJava is a library used for reactive programming in Java. RxJava Observable allows Java developers to work with asynchronous and event-driven code in an organized and declarative manner. It is a concept used to represent a stream of data or events that can be observed by multiple subscribers. An Observable emits items over time, and you can subscribe to it to receive those items.

1.1 Key Points about Observable in RxJava

Below are some key points to understand about Observable in RxJava.

  • Data or Event Stream: An Observable represents a sequence of data items or events that can be emitted over time. These items can be of any data type that can be emitted one at a time or in a batch.
  • Producer of Data: An Observable acts as the source of the data stream. It is the producer of data or events, and it can emit these items to one or more subscribers.
  • Lazy Execution: An Observable is lazy by default. This means that it doesn’t start emitting items until a subscriber subscribes to it which allows for efficient resource management.
  • Subscription: Subscribers can subscribe to an Observable to receive the emitted items or events. When a subscriber subscribes, it establishes a connection with the Observable and starts receiving data.
  • Error Handling: If an error occurs during the emission of items, an Observable can emit an onError signal that allows subscribers to handle errors in a refined manner.

1.2 Example of Using an Observable in RxJava

To use RxJava’s Observable, First, include the RxJava library in your project by adding the necessary dependencies to your project’s build file. In a maven-based project, we can include RxJava in our pom.xml like this:

    <dependencies>
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.1.6</version>
        </dependency>
    </dependencies>

Below is an example of how we can create and use an Observable in RxJava.

public class ObservableExample {

    public static void main(String[] args) {
        
        // Create an Observable that emits a sequence of some great authors
        Observable observable = Observable.create(emitter -> {
            emitter.onNext("Charles Dickens");
            emitter.onNext("Jonathan Swift");
            emitter.onNext("Chinua Achebe");
            emitter.onNext("William Shakespear");
            emitter.onNext("Thomas Paine");
            emitter.onComplete(); // Indicates that the observable has completed
        });

        // Create an Observer to subscribe to the Observable
        Observer observer = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Subscribed to the Observable.");
            }

            @Override
            public void onNext(String value) {
                System.out.println("Received: " + value);
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("Observable completed.");
            }
        };

        // Subscribe the Observer to the Observable
        observable.subscribe(observer);
    }
}

In the above example, We create an Observable named observable using the Observable.create() method. Inside the create block, we manually emit five String values of some great authors using the emitter.onNext() method and then signal that the observable has completed using emitter.onComplete().

Next, we created a Observer named observer that defined how we handled the emitted values and completion and error events. The onSubscribe method is called when the observer subscribes to the observable. Finally, We subscribed the observer to the observable using the observable.subscribe(observer) method. When we run this code, we will see the following output:

Fig 1. The output of running rxjava's Observable example in Java
Fig 1. The output of running rxjava’s Observable example

2. Future Interface

The Future interface is part of the java.util.concurrent package and provides a way to work with computations that run concurrently or asynchronously, without blocking the main thread. The Future interface was introduced in Java 5 (Java 1.5) as part of the Java Concurrency. One of the most common implementations of the Future interface is the java.util.concurrent.FutureTask class, used to wrap callable tasks and execute them concurrently. The Future does not offer a built-in mechanism to handle callback functions or notifications upon completion.

2.1 Key Aspects of the Future Interface

Here are some of the key aspects of the Future Interface in Java:

  • Representation of a Computation: The primary purpose of the Future interface is to represent a task that is running asynchronously. This task can be any operation that might take some time to complete, such as database queries or complex calculations.
  • Non-blocking: The Future interface allows developers to initiate a task and then continue with other tasks or operations without waiting for the original task to finish.
  • Querying Task Status: The Future interface provides methods for checking the status of the associated task. You can use methods like isDone() to check if the task has been completed or isCancelled() method to see if it has been canceled.
  • Retrieving the Result: The Future interface allows you to retrieve the result of a computation once it’s complete.
  • Timeout Handling: The Future interface allows you to specify a maximum amount of time to wait for a computation to complete. If the task isn’t completed within the specified time, a TimeoutException is thrown.

2.2 Example of Using the Future Interface

Below is a basic example of how to use the Future Interface in Java to perform asynchronous computations using the ExecutorService:


public class FutureExample {

   public static void main(String[] args) {
        // Create an ExecutorService with a single thread
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        // Submit a task to the ExecutorService
        Future future = executorService.submit(() -> {
            Thread.sleep(1000); // Simulate a time-consuming operation
            return 9; // The result of the computation
        });

        // Continue with other tasks while the above task is running

        // Check if the task is done
        if (future.isDone()) {
            try {
                // Retrieve the result when the task is complete
                Integer result = future.get();
                System.out.println("Task completed. Result: " + result);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("Task is not yet complete. Continuing with other tasks...");
        }

        // Shutdown the ExecutorService when done
        executorService.shutdown();
    }
}


In this example, we create an ExecutorService with a single thread using Executors.newSingleThreadExecutor() responsible for running our asynchronous task. we submit a task to the ExecutorService using the submit() method defined as a lambda that sleeps for 1 second and then returns the integer value 9 as a result.

After submitting the task, we can continue with other tasks without waiting for it to complete. We use the isDone() method to check if the task is complete. If it is, we use the get() method to retrieve the result. If the task is not yet complete, we print a message indicating that and continue with other tasks. Finally, we shut down the ExecutorService to release its resources.

3. The CompletableFuture Class

CompletableFuture class is used for managing asynchronous tasks in modern Java applications. It was introduced in Java 8 as part of the Java Concurrency framework in the java.util.concurrent package. The CompletableFuture class is used to hold a value that may not be available yet. This value can be a result of a computation or an exception that occurred during the computation.

3.1 Key Features of CompletableFuture

Listed below are some key features associated with the CompletableFuture class:

  • Asynchronous Computation: CompletableFuture is used for tasks that can run asynchronously, without blocking the main thread of your program.
  • Chaining Operations: CompletableFuture has the ability to chain multiple asynchronous operations together. Developers can specify what should happen when a previous CompletableFuture completes using methods like thenApply, thenCompose, thenCombine, and more.
  • Exception Handling: CompletableFuture provides methods for handling exceptions that may occur when performing asynchronous tasks, such as exceptionally, handle, and exceptionallyCompose.

3.2 Example of Using a CompletableFuture

Below is a simple example of how to use CompletableFuture in Java to perform asynchronous operations:

public class CompletableFutureExample {

    public static void main(String[] args) {
        CompletableFuture future = CompletableFuture.supplyAsync(() -> 42); // Asynchronously compute 42

        future.thenApply(result -> result * 2) // Double the result
                .thenAccept(finalResult -> System.out.println("Result: " + finalResult))
                .exceptionally(ex -> {
                    System.err.println("Exception: " + ex);
                    return null;
                });
        
// Wait for the CompletableFuture to complete
        try {
            future.join(); // Block until the CompletableFuture is done
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        System.out.println("Main thread continues to execute other tasks...");
    }
}

In the above example, we create a CompletableFuture that asynchronously computes the value 42 and then chain it to double the result and print it. We used future.join() to block the main thread and wait for the CompletableFuture to complete which is not recommended in production code as it can block the main thread.

4. Conclusion

In this article, we listed some of the key features of Future, CompletableFuture and RxJava Observable. We also showed some basic usage and examples of how to execute tasks concurrently using Future, CompletableFuture and RxJava Observable.

In conclusion, understanding the differences between Future, CompletableFuture and RxJava Observable is crucial for effective asynchronous programming in Java. The choice between these tools depends on the requirements of your application. Use Observables when dealing with complex event-driven scenarios, Futures for simple asynchronous tasks, and CompletableFuture for more control and flexibility in managing asynchronous operations.

5. Download the Source Code

This was an example of the difference Between Rxjava’s Observable, Future, and CompletableFuture in Java.

Download
You can download the full source code of this example here: Difference Between Rxjava’s Observable, Future, and CompletableFuture

Omozegie Aziegbe

Omos holds a Master degree in Information Engineering with Network Management from the Robert Gordon University, Aberdeen. Omos is currently a freelance web/application developer who is currently focused on developing Java enterprise applications with the Jakarta EE framework.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button