Core Java

Java Fork and Join

In this tutorial, we will look at the Fork Join framework in Java with examples. It speeds up parallel processing by leveraging all processor cores with a divide and conquer approach.

The fundamental idea is the below algorithm from original Fork Join paper.

Result solve(Problem problem) 
{ 
if (problem is small)
directly solve problem 
else {
   split problem into independent parts 
   fork new subtasks to solve each part 
   join all subtasks
   compose result from subresults
} 
}

The framework provides efficient construction and management of task queues and worker threads. The heart of fork/join framework lies in its lightweight scheduling mechanics and work-stealing. Each worker thread maintains a queue that supports both LIFO and FIFO hence termed deque.

fork join java

By default, a worker thread gets tasks from the head of its own deque. When it is empty, the thread takes a task from the tail of the deque of another busy thread or from the global entry queue, since this is where the biggest pieces of work are likely to be located. This approach minimizes the possibility that threads will compete for tasks.

1. Fork & Join

Let us leverage the framework for generating a Fibonacci sequence. In mathematics, numbers in Fibonacci sequence are such that each number is the sum of the two preceding ones, starting from 0. So for example lets say if we had to print number 10 in the sequence we have to calculate number 8 and 9 in the sequence and add them up.

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class Fibonacci extends ForkJoinTask<Integer> {

    private int input;
    private int sum = 0;

    public Fibonacci(int input) {

        this.input = input;
    }

    @Override
    public Integer getRawResult() {
        if (!isDone())
            return null;
        return sum;
    }

    @Override
    protected void setRawResult(Integer value) {
        this.sum = value;
    }

    @Override
    protected boolean exec() {
        if (input <= 5) {
            sum = seqFib(input);
            return true;
        }

        Fibonacci task1 = new Fibonacci(input - 1);
        Fibonacci task2 = new Fibonacci(input - 2);
        task1.fork();
        sum = task2.invoke() + task1.join();
        return true;
    }

    int seqFib(int n) {
        if (n <= 1)
            return n;
        return seqFib(n - 1) + seqFib(n - 2);
    }
  
  	public static void main(String[] args) {
        Fibonacci task = new Fibonacci(40);
        System.out.println(new ForkJoinPool().invoke(task));
    }
}

Running the above program produces the below result:

 102334155

This program just takes the sequence number to be printed as input using the constructor variable input. To implement the Fork/Join framework, our class must implement ForkJoinTask which ensures that ForkJoinPool can execute this task. We will discuss about ForkJoinPool in next section.

The main execution method is exec. In our above example, we are breaking the problem into two subproblems mimicking the Fibonacci logic. The first task is forked which basically means executed parallelly in the background.

Next is the join phase in which results of all the tasks are combined to generate the final result. Here we are invoking the second task to compute the result and adding with the result of the first task. invoke function invokes the task i.e. calls the exec method waiting for result and maintaining the completion of the task. The join keyword ensures that the program waits for the result from the first task.

This in a nutshell is the concept of the Java fork and join. As stated in the paper, not all tasks need to be computed parallelly. In our example, we compute the series sequentially for input less than 5 which is provided by the seqFib method.

2. ForkJoinPool

ForkJoinPool is the heart of the framework. It is an implementation of the ExecutorService which manages worker threads and provides tools to get information about the thread pool state and performance.

Worker threads can execute only one task at a time, but ForkJoinPool doesn’t create a separate thread for every single subtask. Instead, each thread in the pool has its own double-ended queue which stores tasks.

public static void main(String[] args) {
        Fibonacci task = new Fibonacci(40);
        System.out.println(new ForkJoinPool().invoke(task));
    }

We have instantiated a new instance of ForkJoinPool and invoking the main task to print the 40th number in Fibonacci sequence. Invoke method on completion gets the value using the getRawResult method. In the method we check if the job has completed successfully which is set based on the result of exec method and if so return the computed value 102334155.

3. RecursiveTask

In the above example, we extended ForkJoinTask to run the Fork Join framework. There is another easier abstraction provided over the ForkJoinTask which reduces the overhead of maintaining state and also checking for completion.

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class Factorial extends RecursiveTask<Double> {

    private long start;
    private long end;

    public static final long threshold = 5;

    public Factorial(long number) {
        this(1, number);
    }

    private Factorial(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Double compute() {
        long length = (end - start + 1);
        if (length <= threshold) {
            return factorial();
        }

        long mid = length / 2;
        Factorial firstTask = new Factorial(start, start + mid);
        Factorial secondTask = new Factorial(start + mid + 1, end);
        firstTask.fork();
        return secondTask.compute() * firstTask.join();

    }

    private Double factorial() {
        Double result = 1.0;
        for (long i = start; i <= end; i++) {
            result *= i;
        }
        return result;
    }


    public static void main(String[] args) {
        ForkJoinTask<Double> task = new Factorial(100);
        System.out.println(ForkJoinPool.commonPool().invoke(task));
    }

}

Running the above program produces the below result

9.332621544394414E157 

In this example, compute is the main workhorse method. Similar to the above example,

  • If number is less than 5, we compute the factorial sequentially.
  • If not we break into two sub problems invoking one and forking the other.
  • The next step is joining the result of both tasks.
  • The tasks recursively perform the same step and finally the results are combined.

In this example, we are not maintaining the result but rather just returning the computed result. Also RecursiveTask checks whether the job is completed and returns the result on invocation. Also, Java recommends usage of the common thread pool for efficient resource consumption which is leveraged in the main method.

4. RecursiveAction

RecursiveAction is very similar to RecursiveTask except that it does not return a result and hence used in cases where a certain action is to be performed in a parallel manner. We can look at the concept with the below example where we are incrementing all the elements of array by 1.

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.stream.IntStream;

public class ForkJoinAdd extends RecursiveAction {

    private int start;
    private int end;
    private int[] array;

    public ForkJoinAdd(int start, int end, int[] array) {
        this.start = start;
        this.end = end;
        this.array = array;
    }

    @Override
    protected void compute() {
        int length = (end - start + 1);

        if (length <= 10) {
            for (int i = start; i <= end; i++) {
                array[i] += 1;
            }
            return;
        }

        int mid = length / 2;
        RecursiveAction firstTask = new ForkJoinAdd(start, start + mid - 1, array);
        RecursiveAction secondTask = new ForkJoinAdd(start + mid, end, array);
        firstTask.fork();
        secondTask.invoke();
        firstTask.join();
    }

    public static void main(String[] args) {
        int numSize = 1_000;
        final int[] array = new int[numSize];
        IntStream.range(0, array.length).forEach(index -> array[index] = index);
        ForkJoinAdd forkJoinAdd = new ForkJoinAdd(0, numSize - 1, array);
        ForkJoinPool.commonPool().invoke(forkJoinAdd);
    }

Running the above program increments each element of the array by 1. If we do run a sum of all the elements we will get the below result

5050

The example and concept is very similar to the above example except that invoke or join does not return any result. Instead, they increment the array internally. Here, we have specified the threshold as 10 and break the problem into subproblems if the size of the input is greater than 10.

5. Download the Source code

In this tutorial, we checked the Java Fork Join using examples.

Download
You can download the full source code of this example here: Java Fork and Join

Rajagopal ParthaSarathi

Rajagopal works in software industry solving enterprise-scale problems for customers across geographies specializing in distributed platforms. He holds a masters in computer science with focus on cloud computing from Illinois Institute of Technology. His current interests include data science and distributed computing.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button