Java Fork and Join
In this tutorial, we will look at the Fork Join framework in Java with examples. It speeds up parallel processing by leveraging all processor cores with a divide and conquer approach.
The fundamental idea is the below algorithm from original Fork Join paper.
Result solve(Problem problem)
{
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
The framework provides efficient construction and management of task queues and worker threads. The heart of fork/join framework lies in its lightweight scheduling mechanics and work-stealing. Each worker thread maintains a queue that supports both LIFO and FIFO hence termed deque.
By default, a worker thread gets tasks from the head of its own deque. When it is empty, the thread takes a task from the tail of the deque of another busy thread or from the global entry queue, since this is where the biggest pieces of work are likely to be located. This approach minimizes the possibility that threads will compete for tasks.
1. Fork & Join
Let us leverage the framework for generating a Fibonacci sequence. In mathematics, numbers in Fibonacci sequence are such that each number is the sum of the two preceding ones, starting from 0. So for example lets say if we had to print number 10 in the sequence we have to calculate number 8 and 9 in the sequence and add them up.
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; public class Fibonacci extends ForkJoinTask<Integer> { private int input; private int sum = 0; public Fibonacci(int input) { this.input = input; } @Override public Integer getRawResult() { if (!isDone()) return null; return sum; } @Override protected void setRawResult(Integer value) { this.sum = value; } @Override protected boolean exec() { if (input <= 5) { sum = seqFib(input); return true; } Fibonacci task1 = new Fibonacci(input - 1); Fibonacci task2 = new Fibonacci(input - 2); task1.fork(); sum = task2.invoke() + task1.join(); return true; } int seqFib(int n) { if (n <= 1) return n; return seqFib(n - 1) + seqFib(n - 2); } public static void main(String[] args) { Fibonacci task = new Fibonacci(40); System.out.println(new ForkJoinPool().invoke(task)); } }
Running the above program produces the below result:
102334155
This program just takes the sequence number to be printed as input using the constructor variable input
. To implement the Fork/Join framework, our class must implement ForkJoinTask
which ensures that ForkJoinPool
can execute this task. We will discuss about ForkJoinPool
in next section.
The main execution method is exec
. In our above example, we are breaking the problem into two subproblems mimicking the Fibonacci logic. The first task is forked which basically means executed parallelly in the background.
Next is the join phase in which results of all the tasks are combined to generate the final result. Here we are invoking the second task to compute the result and adding with the result of the first task. invoke function invokes the task i.e. calls the exec method waiting for result and maintaining the completion of the task. The join keyword ensures that the program waits for the result from the first task.
This in a nutshell is the concept of the Java fork and join. As stated in the paper, not all tasks need to be computed parallelly. In our example, we compute the series sequentially for input less than 5 which is provided by the seqFib
method.
2. ForkJoinPool
ForkJoinPool
is the heart of the framework. It is an implementation of the ExecutorService which manages worker threads and provides tools to get information about the thread pool state and performance.
Worker threads can execute only one task at a time, but ForkJoinPool
doesn’t create a separate thread for every single subtask. Instead, each thread in the pool has its own double-ended queue which stores tasks.
public static void main(String[] args) {
Fibonacci task = new Fibonacci(40);
System.out.println(new ForkJoinPool().invoke(task));
}
We have instantiated a new instance of ForkJoinPool
and invoking the main task to print the 40th number in Fibonacci sequence. Invoke method on completion gets the value using the getRawResult
method. In the method we check if the job has completed successfully which is set based on the result of exec
method and if so return the computed value 102334155.
3. RecursiveTask
In the above example, we extended ForkJoinTask
to run the Fork Join framework. There is another easier abstraction provided over the ForkJoinTask
which reduces the overhead of maintaining state and also checking for completion.
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; public class Factorial extends RecursiveTask<Double> { private long start; private long end; public static final long threshold = 5; public Factorial(long number) { this(1, number); } private Factorial(long start, long end) { this.start = start; this.end = end; } @Override protected Double compute() { long length = (end - start + 1); if (length <= threshold) { return factorial(); } long mid = length / 2; Factorial firstTask = new Factorial(start, start + mid); Factorial secondTask = new Factorial(start + mid + 1, end); firstTask.fork(); return secondTask.compute() * firstTask.join(); } private Double factorial() { Double result = 1.0; for (long i = start; i <= end; i++) { result *= i; } return result; } public static void main(String[] args) { ForkJoinTask<Double> task = new Factorial(100); System.out.println(ForkJoinPool.commonPool().invoke(task)); } }
Running the above program produces the below result
9.332621544394414E157
In this example, compute
is the main workhorse method. Similar to the above example,
- If number is less than 5, we compute the factorial sequentially.
- If not we break into two sub problems invoking one and forking the other.
- The next step is joining the result of both tasks.
- The tasks recursively perform the same step and finally the results are combined.
In this example, we are not maintaining the result but rather just returning the computed result. Also RecursiveTask
checks whether the job is completed and returns the result on invocation. Also, Java recommends usage of the common thread pool for efficient resource consumption which is leveraged in the main method.
4. RecursiveAction
RecursiveAction
is very similar to RecursiveTask
except that it does not return a result and hence used in cases where a certain action is to be performed in a parallel manner. We can look at the concept with the below example where we are incrementing all the elements of array by 1.
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.stream.IntStream; public class ForkJoinAdd extends RecursiveAction { private int start; private int end; private int[] array; public ForkJoinAdd(int start, int end, int[] array) { this.start = start; this.end = end; this.array = array; } @Override protected void compute() { int length = (end - start + 1); if (length <= 10) { for (int i = start; i <= end; i++) { array[i] += 1; } return; } int mid = length / 2; RecursiveAction firstTask = new ForkJoinAdd(start, start + mid - 1, array); RecursiveAction secondTask = new ForkJoinAdd(start + mid, end, array); firstTask.fork(); secondTask.invoke(); firstTask.join(); } public static void main(String[] args) { int numSize = 1_000; final int[] array = new int[numSize]; IntStream.range(0, array.length).forEach(index -> array[index] = index); ForkJoinAdd forkJoinAdd = new ForkJoinAdd(0, numSize - 1, array); ForkJoinPool.commonPool().invoke(forkJoinAdd); }
Running the above program increments each element of the array by 1. If we do run a sum of all the elements we will get the below result
5050
The example and concept is very similar to the above example except that invoke
or join
does not return any result. Instead, they increment the array internally. Here, we have specified the threshold as 10 and break the problem into subproblems if the size of the input is greater than 10.
5. Download the Source code
In this tutorial, we checked the Java Fork Join using examples.
You can download the full source code of this example here: Java Fork and Join