ForkJoinWorkerThread

java.util.concurrent.ForkJoinWorkerThread Example

In this post, we are going to discuss about the class java.util.concurrent.ForkJoinWorkerThread and give you and idea of how you can use it on your own code when building robust multi-threaded applications.

1. ForkJoinWorkerThread Class

One of the most interesting features of Java 7 is the Fork/Join framework. It’s an implementation of the Executor and ExecutorService interfaces that allow you the execution of the Callable and Runnable tasks without managing the threads that execute them.

This executor is oriented to execute tasks that can be divided into smaller parts. Its main components are as follows:

  • A special kind of task, implemented by the ForkJoinTask class.
  • Two operations for dividing a task into subtasks (the fork operation) and to wait for the finalization of those subtasks (the join operation).
  • An algorithm, denominating the work-stealing algorithm, that optimizes the use of the threads of the pool. When a task is waiting for its subtasks, the thread that was executing it is used to execute another thread.

The main class of the Fork/Join framework is the ForkJoinPool class. Internally, it has the following two elements:

  • A queue of tasks that are waiting to be executed
  • A pool of threads that execute the tasks

2. Executing some code

WorkerThread.java

package com.javacodegeeks.examples.forkjoinworkerthread.threads;

//~--- JDK imports ------------------------------------------------------------

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;

/*
* This class implements a custom thread for the Fork/Join framework. It extends
* the ForkJoinWorkerThread that is the default implementation of the threads
* that executes the tasks in the Fork/Join Framework. This custom thread counts
* the number of tasks executed in it
*/
public class WorkerThread extends ForkJoinWorkerThread {
    private static ThreadLocal<Integer> taskCounter = new ThreadLocal();

    public WorkerThread(ForkJoinPool pool) {
        super(pool);
    }

    @Override
    protected void onStart() {
        super.onStart();
        System.out.printf("WorkThread %d: Initializing task counter.\n", this.getId());
        taskCounter.set(0);
    }

    @Override
    protected void onTermination(Throwable exception) {
        System.out.printf("WorkerThread %d: %d\n", getId(), taskCounter.get());
        super.onTermination(exception);
    }

    public void addTask() {
        int counter = taskCounter.get().intValue();

        counter++;
        taskCounter.set(counter);
    }
}

WorkerThreadFactory.java

package com.javacodegeeks.examples.forkjoinworkerthread.factories;

//~--- non-JDK imports --------------------------------------------------------

import com.javacodegeeks.examples.forkjoinworkerthread.threads.WorkerThread;

//~--- JDK imports ------------------------------------------------------------

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;

/*
* Factory to be used by the Fork/Join framework to create the worker threads. Implements
* the ForkJoinWorkerThreadFactory interface
*/
public class WorkerThreadFactory implements ForkJoinWorkerThreadFactory {
    @Override
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return new WorkerThread(pool);
    }
}

ARecursiveTask.java

package com.javacodegeeks.examples.forkjoinworkerthread.tasks;

//~--- non-JDK imports --------------------------------------------------------

import com.javacodegeeks.examples.forkjoinworkerthread.threads.WorkerThread;

//~--- JDK imports ------------------------------------------------------------

import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

/*
* Task that will be executed in the Fork/Join framework. It calculates
* the sum of all array elements
*/
public class ARecursiveTask extends RecursiveTask {
    private static final long serialVersionUID = -4702976772011326493L;

    // Array to be summed
    private int[] intArray;

    // Start and end positions of the part of the array to be summed by this task
    private int start, end;

    public ARecursiveTask(int[] array, int start, int end) {
        this.intArray = array;
        this.start    = start;
        this.end      = end;
    }

    @Override
    protected Integer compute() {
        Integer      ret;
        WorkerThread thread = (WorkerThread) Thread.currentThread();

        thread.addTask();

        if (end - start > 100) {
            int            mid   = (start + end) / 2;
            ARecursiveTask task1 = new ARecursiveTask(intArray, start, mid);
            ARecursiveTask task2 = new ARecursiveTask(intArray, mid, end);

            invokeAll(task1, task2);
            ret = addResults(task1, task2);
        } else {
            int add = 0;

            for (int i = start; i < end; i++) {
                add += intArray[i];
            }

            ret = new Integer(add);
        }

        try {
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return ret;
    }

    private Integer addResults(ARecursiveTask task1, ARecursiveTask task2) {
        int value;

        try {
            value = task1.get().intValue() + task2.get().intValue();
        } catch (InterruptedException e) {
            e.printStackTrace();
            value = 0;
        } catch (ExecutionException e) {
            e.printStackTrace();
            value = 0;
        }

        return new Integer(value);
    }
}

Let’s explain the methods used in the previous code

  • protected void onStart() – Initializes internal state after construction but before processing any tasks. If you override this method, you must invoke super.onStart() at the beginning of the method. Initialization requires care: Most fields must have legal default values, to ensure that attempted accesses from other threads work correctly even before this thread starts processing tasks.
  • protected void onTermination(Throwable exception) – Performs cleanup associated with termination of this worker thread. If you override this method, you must invoke super.onTermination at the end of the overridden method.

The output of the command

com.javacodegeeks.examples.forkjoinworkerthread.App

should be similar to:

WorkThread 8: Initializing task counter.
WorkThread 9: Initializing task counter.
WorkThread 10: Initializing task counter.
WorkThread 11: Initializing task counter.
WorkerThread 10: 543
WorkerThread 9: 448
WorkerThread 8: 513
WorkerThread 11: 543
Main: Result: 100000
Main: End of the program

3. Download the Eclipse project of this tutorial:

This was an example of how to set use the ForkJoinWorkerThread Class.

Download
You can download the full source code of this example here: forkjoinworkerthread.zip

Armando Flores

Armando graduated from from Electronics Engineer in the The Public University Of Puebla (BUAP). He also has a Masters degree in Computer Sciences from CINVESTAV. He has been using the Java language for Web Development for over a decade. He has been involved in a large number of projects focused on "ad-hoc" Web Application based on Java EE and Spring Framework.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button