In this tutorial, we will look at the Fork Join framework in Java with examples. It speeds up parallel processing by leveraging all processor cores with a divide and conquer approach.

The fundamental idea is the below algorithm from original Fork Join paper.

```Result solve(Problem problem)
{
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
compose result from subresults
}
}
```

The framework provides efficient construction and management of task queues and worker threads. The heart of fork/join framework lies in its lightweight scheduling mechanics and work-stealing. Each worker thread maintains a queue that supports both LIFO and FIFO hence termed deque.

By default, a worker thread gets tasks from the head of its own deque. When it is empty, the thread takes a task from the tail of the deque of another busy thread or from the global entry queue, since this is where the biggest pieces of work are likely to be located. This approach minimizes the possibility that threads will compete for tasks.

## 1. Fork & Join

Let us leverage the framework for generating a Fibonacci sequence. In mathematics, numbers in Fibonacci sequence are such that each number is the sum of the two preceding ones, starting from 0. So for example lets say if we had to print number 10 in the sequence we have to calculate number 8 and 9 in the sequence and add them up.

```import java.util.concurrent.ForkJoinPool;
public class Fibonacci extends ForkJoinTask<Integer> {

private int input;
private int sum = 0;

public Fibonacci(int input) {

this.input = input;
}

@Override
public Integer getRawResult() {
if (!isDone())
return null;
return sum;
}

@Override
protected void setRawResult(Integer value) {
this.sum = value;
}

@Override
protected boolean exec() {
if (input <= 5) {
sum = seqFib(input);
return true;
}

Fibonacci task1 = new Fibonacci(input - 1);
Fibonacci task2 = new Fibonacci(input - 2);
return true;
}

int seqFib(int n) {
if (n <= 1)
return n;
return seqFib(n - 1) + seqFib(n - 2);
}

public static void main(String[] args) {
}
}```

Running the above program produces the below result:

` 102334155`

This program just takes the sequence number to be printed as input using the constructor variable `input`. To implement the Fork/Join framework, our class must implement `ForkJoinTask` which ensures that `ForkJoinPool` can execute this task. We will discuss about `ForkJoinPool` in next section.

The main execution method is `exec`. In our above example, we are breaking the problem into two subproblems mimicking the Fibonacci logic. The first task is forked which basically means executed parallelly in the background.

Next is the join phase in which results of all the tasks are combined to generate the final result. Here we are invoking the second task to compute the result and adding with the result of the first task. invoke function invokes the task i.e. calls the exec method waiting for result and maintaining the completion of the task. The join keyword ensures that the program waits for the result from the first task.

This in a nutshell is the concept of the Java fork and join. As stated in the paper, not all tasks need to be computed parallelly. In our example, we compute the series sequentially for input less than 5 which is provided by the `seqFib` method.

## 2. ForkJoinPool

`ForkJoinPool` is the heart of the framework. It is an implementation of the ExecutorService which manages worker threads and provides tools to get information about the thread pool state and performance.

Worker threads can execute only one task at a time, but `ForkJoinPool` doesn’t create a separate thread for every single subtask. Instead, each thread in the pool has its own double-ended queue which stores tasks.

```public static void main(String[] args) {
}
```

We have instantiated a new instance of `ForkJoinPool` and invoking the main task to print the 40th number in Fibonacci sequence. Invoke method on completion gets the value using the `getRawResult` method. In the method we check if the job has completed successfully which is set based on the result of `exec` method and if so return the computed value 102334155.

In the above example, we extended `ForkJoinTask` to run the Fork Join framework. There is another easier abstraction provided over the `ForkJoinTask` which reduces the overhead of maintaining state and also checking for completion.

```import java.util.concurrent.ForkJoinPool;
public class Factorial extends RecursiveTask<Double> {

private long start;
private long end;

public static final long threshold = 5;

public Factorial(long number) {
this(1, number);
}

private Factorial(long start, long end) {
this.start = start;
this.end = end;
}

@Override
protected Double compute() {
long length = (end - start + 1);
if (length <= threshold) {
return factorial();
}

long mid = length / 2;
Factorial firstTask = new Factorial(start, start + mid);
Factorial secondTask = new Factorial(start + mid + 1, end);

}

private Double factorial() {
Double result = 1.0;
for (long i = start; i <= end; i++) {
result *= i;
}
return result;
}

public static void main(String[] args) {
}

}```

Running the above program produces the below result

`9.332621544394414E157 `

In this example, `compute` is the main workhorse method. Similar to the above example,

• If number is less than 5, we compute the factorial sequentially.
• If not we break into two sub problems invoking one and forking the other.
• The next step is joining the result of both tasks.
• The tasks recursively perform the same step and finally the results are combined.

In this example, we are not maintaining the result but rather just returning the computed result. Also `RecursiveTask` checks whether the job is completed and returns the result on invocation. Also, Java recommends usage of the common thread pool for efficient resource consumption which is leveraged in the main method.

## 4. RecursiveAction

`RecursiveAction` is very similar to `RecursiveTask` except that it does not return a result and hence used in cases where a certain action is to be performed in a parallel manner. We can look at the concept with the below example where we are incrementing all the elements of array by 1.

```import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.stream.IntStream;

public class ForkJoinAdd extends RecursiveAction {

private int start;
private int end;
private int[] array;

public ForkJoinAdd(int start, int end, int[] array) {
this.start = start;
this.end = end;
this.array = array;
}

@Override
protected void compute() {
int length = (end - start + 1);

if (length <= 10) {
for (int i = start; i <= end; i++) {
array[i] += 1;
}
return;
}

int mid = length / 2;
}

public static void main(String[] args) {
int numSize = 1_000;
final int[] array = new int[numSize];
IntStream.range(0, array.length).forEach(index -> array[index] = index);
}```

Running the above program increments each element of the array by 1. If we do run a sum of all the elements we will get the below result

`5050`

The example and concept is very similar to the above example except that `invoke` or `join` does not return any result. Instead, they increment the array internally. Here, we have specified the threshold as 10 and break the problem into subproblems if the size of the input is greater than 10.

You can download the full source code of this example here: Java Fork and Join

### Rajagopal ParthaSarathi

Rajagopal works in software industry solving enterprise-scale problems for customers across geographies specializing in distributed platforms. He holds a masters in computer science with focus on cloud computing from Illinois Institute of Technology. His current interests include data science and distributed computing.
Subscribe
Notify of This site uses Akismet to reduce spam. Learn how your comment data is processed.