java.util.concurrent.ForkJoinWorkerThread Example
In this post, we are going to discuss about the class java.util.concurrent.ForkJoinWorkerThread
and give you and idea of how you can use it on your own code when building robust multi-threaded applications.
1. ForkJoinWorkerThread Class
One of the most interesting features of Java 7 is the Fork/Join framework. It’s an implementation of the Executor
and ExecutorService
interfaces that allow you the execution of the Callable
and Runnable
tasks without managing the threads that execute them.
This executor is oriented to execute tasks that can be divided into smaller parts. Its main components are as follows:
- A special kind of task, implemented by the
ForkJoinTask
class. - Two operations for dividing a task into subtasks (the fork operation) and to wait for the finalization of those subtasks (the join operation).
- An algorithm, denominating the work-stealing algorithm, that optimizes the use of the threads of the pool. When a task is waiting for its subtasks, the thread that was executing it is used to execute another thread.
The main class of the Fork/Join framework is the ForkJoinPool
class. Internally, it has the following two elements:
- A queue of tasks that are waiting to be executed
- A pool of threads that execute the tasks
2. Executing some code
WorkerThread.java
package com.javacodegeeks.examples.forkjoinworkerthread.threads; //~--- JDK imports ------------------------------------------------------------ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinWorkerThread; /* * This class implements a custom thread for the Fork/Join framework. It extends * the ForkJoinWorkerThread that is the default implementation of the threads * that executes the tasks in the Fork/Join Framework. This custom thread counts * the number of tasks executed in it */ public class WorkerThread extends ForkJoinWorkerThread { private static ThreadLocal<Integer> taskCounter = new ThreadLocal(); public WorkerThread(ForkJoinPool pool) { super(pool); } @Override protected void onStart() { super.onStart(); System.out.printf("WorkThread %d: Initializing task counter.\n", this.getId()); taskCounter.set(0); } @Override protected void onTermination(Throwable exception) { System.out.printf("WorkerThread %d: %d\n", getId(), taskCounter.get()); super.onTermination(exception); } public void addTask() { int counter = taskCounter.get().intValue(); counter++; taskCounter.set(counter); } }
WorkerThreadFactory.java
package com.javacodegeeks.examples.forkjoinworkerthread.factories; //~--- non-JDK imports -------------------------------------------------------- import com.javacodegeeks.examples.forkjoinworkerthread.threads.WorkerThread; //~--- JDK imports ------------------------------------------------------------ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; import java.util.concurrent.ForkJoinWorkerThread; /* * Factory to be used by the Fork/Join framework to create the worker threads. Implements * the ForkJoinWorkerThreadFactory interface */ public class WorkerThreadFactory implements ForkJoinWorkerThreadFactory { @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new WorkerThread(pool); } }
ARecursiveTask.java
package com.javacodegeeks.examples.forkjoinworkerthread.tasks; //~--- non-JDK imports -------------------------------------------------------- import com.javacodegeeks.examples.forkjoinworkerthread.threads.WorkerThread; //~--- JDK imports ------------------------------------------------------------ import java.util.concurrent.ExecutionException; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; /* * Task that will be executed in the Fork/Join framework. It calculates * the sum of all array elements */ public class ARecursiveTask extends RecursiveTask { private static final long serialVersionUID = -4702976772011326493L; // Array to be summed private int[] intArray; // Start and end positions of the part of the array to be summed by this task private int start, end; public ARecursiveTask(int[] array, int start, int end) { this.intArray = array; this.start = start; this.end = end; } @Override protected Integer compute() { Integer ret; WorkerThread thread = (WorkerThread) Thread.currentThread(); thread.addTask(); if (end - start > 100) { int mid = (start + end) / 2; ARecursiveTask task1 = new ARecursiveTask(intArray, start, mid); ARecursiveTask task2 = new ARecursiveTask(intArray, mid, end); invokeAll(task1, task2); ret = addResults(task1, task2); } else { int add = 0; for (int i = start; i < end; i++) { add += intArray[i]; } ret = new Integer(add); } try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } return ret; } private Integer addResults(ARecursiveTask task1, ARecursiveTask task2) { int value; try { value = task1.get().intValue() + task2.get().intValue(); } catch (InterruptedException e) { e.printStackTrace(); value = 0; } catch (ExecutionException e) { e.printStackTrace(); value = 0; } return new Integer(value); } }
Let’s explain the methods used in the previous code
protected void onStart()
– Initializes internal state after construction but before processing any tasks. If you override this method, you must invoke super.onStart() at the beginning of the method. Initialization requires care: Most fields must have legal default values, to ensure that attempted accesses from other threads work correctly even before this thread starts processing tasks.protected void onTermination(Throwable exception)
– Performs cleanup associated with termination of this worker thread. If you override this method, you must invoke super.onTermination at the end of the overridden method.
The output of the command
com.javacodegeeks.examples.forkjoinworkerthread.App
should be similar to:
WorkThread 8: Initializing task counter. WorkThread 9: Initializing task counter. WorkThread 10: Initializing task counter. WorkThread 11: Initializing task counter. WorkerThread 10: 543 WorkerThread 9: 448 WorkerThread 8: 513 WorkerThread 11: 543 Main: Result: 100000 Main: End of the program
3. Download the Eclipse project of this tutorial:
This was an example of how to set use the ForkJoinWorkerThread
Class.
You can download the full source code of this example here: forkjoinworkerthread.zip