Core Java

Creating and Starting Java Threads

In this post, we feature a comprehensive article on Creating and Starting Java Threads.

1. Introduction

This article aims at a simplified understanding of creating and starting threads via an example-based approach. The examples and code samples are based on JDK 8.

 

2. Java Threads

A Thread is the smallest unit of processing that can be performed in an Operating System. In Java, each thread is associated with an instance of java.lang.Thread. There are 2 types of Threads.

  • User Thread
  • Daemon Thread

Application or user creates user threads. JVM treats user threads with high priority. It will not exit until all the user threads complete their execution.

Daemon threads are usually background threads created by JVM. These are less-priority threads, and JVM will not wait for the completion of daemon threads during exit.

DaemonThreadEx.java

public class DaemonThreadEx extends Thread
{

    @Override
    public void run(){
        System.out.println("Is started thread daemon? " + this.isDaemon());
        System.out.println("Thread execution starts " + this.getPriority());
        IntStream.iterate(0,i-> i+1).limit(10).forEach(i -> System.out.printf("%d ",i));
        System.out.println("Thread execution complete");
    }
    public static void main(String[] args){
        DaemonThreadEx example = new DaemonThreadEx();
        System.out.println("Is Main thread daemon ? " + Thread.currentThread().isDaemon());
        example.setDaemon(true);
        example.start();
        IntStream.iterate(0,i-> i+1).limit(3).forEach(i -> System.out.printf("%d@main ",i));
    }
}

OUTPUT

Is Main thread daemon ? false
Is started thread daemon? true
Thread execution starts 5
0@main 0 1 2 3 4 1@main 5 6 2@main 7 8 9 Thread execution complete

In the
above example, there are 2 threads involved. 1. Main Thread initiated by JVM.
2. Thread started from main at line#15. We shall refer this 2nd thread as
‘Started thread’.

The started thread is marked as Daemon. After it is marked as ‘Daemon thread’, at Line 14, the thread is de-prioritized. The main thread is a User Thread and the  Started thread is a Daemon thread. After Line#15 is executed, both the threads execute in parallel. As soon as the main thread completes, the Started thread is terminated. In other words, the Started thread is executed as long as the main thread executes. It may or may not complete its execution. Hence, running the above code gives varying results based on the execution time of the main thread.

3. Simple Threads

3.1 Extends Thread

SimpleThreadType1.java

public class SimpleThreadType1 extends Thread
{

    @Override
    public void run(){
        System.out.println("Thread execution starts " + this.getName()+"\n");
        IntStream.iterate(0,i-> i+1).limit(25).forEach(i -> System.out.printf("%d ",i));
        System.out.println("Thread execution complete" + this.getName()+"\n");

    }
    public static void main(String[] args){
        SimpleThreadType1 obj = new SimpleThreadType1();
        
        System.out.println("Main Thread " + Thread.currentThread().getName()+"\n");
        obj.start();
        IntStream.iterate(0,i-> i+1).limit(3).forEach(i -> System.out.printf("%d@main ",i));
        System.out.println("Main Thread Execution completes " + Thread.currentThread().getName()+"\n");
    }
}

In the above example, the class SimpleThreadType1 extends java.lang.Thread. Here, in Line#12, Java Thread object obj is created using the constructor Thread() of its super class java.lang.Thread . In Line#15, when the start() method is called on the obj, the thread is started and the code present inside the run() method is executed.

3.2 Implement Runnable

SimpleThreadType2.java

public class SimpleThreadType2 implements Runnable {

    @Override
    public void run(){
        System.out.println("Thread execution starts " +"\n");
        IntStream.iterate(0,i-> i+1).limit(25).forEach(i -> System.out.printf("%d ",i));
        System.out.println("Thread execution complete" + "\n");

    }
    public static void main(String[] args){
        
        System.out.println("Main Thread " + Thread.currentThread().getName()+"\n");
        new Thread(new SimpleThreadType2()).start();
        IntStream.iterate(0,i-> i+1).limit(3).forEach(i -> System.out.printf("%d@main ",i));
        System.out.println("Main Thread Execution completes " + Thread.currentThread().getName()+"\n");
    }

}

In the above example, in Line#13, a thread object is created using the constructor Thread(Runnable target) of java.lang.Thread. The Runnable represents the task to be executed in the thread.

For a detailed look at Runnable, please refer here

4. Executors

Java Executor Framework was introduced in JDK 1.5 for handling the threads efficiently in our applications.

Java Threads - Executor Framework Class Diagram
Executor Framework Class Diagram

The above class diagram gives an overview of the Executor framework.

java.util.concurrent.Executor is an interface that abstracts the task submission to a thread. It decouples the task submission mechanism from the task execution.

java.util.concurrent.ExecutorService interface is an Executor with some enhanced capabilities such as manage termination, can produce Future (result returned from a thread execution),
collective execution of tasks etc.

java.util.concurrent.AbstractExecutorService provides default implementations for submit, invokeAny and invokeAll methods.

java.util.concurrent.ScheduledExecutorService is an ExecutorService that has the capability to schedule the tasks for a given delay or time period.

ForkJoinPool, ThreadPoolExecutor and ScheduledThreadPoolExecutor are the concrete implementations of java.util.concurrent.AbstractExecutorService

ExecutorServiceEx1.java

public class ExecutorServiceEx1 {
    
    public static void main(String[] args)
    {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        System.out.println("Main Thread starts " + Thread.currentThread().getName()+"\n");
        //submitting a runnable
        executor.submit(()-> {
            try{
            System.out.println("Task thread starts:" + Thread.currentThread().getName()+"\n");
            Thread.sleep(5000); 
            } catch(InterruptedException iex){
                iex.printStackTrace();
            }
            System.out.println("Task thread ends:" + Thread.currentThread().getName()+"\n");
        });
        executor.shutdown();
        System.out.println("Main Thread shut down the executor " + Thread.currentThread().getName()+"\n");
    }
}

In Line#5, an ExecutorService is instantiated using one of the factory- methods of the utility class java.util.concurrent.Executors. newSingleThreadExecutor() is a static method that creates an ExecutorService with one worker thread.

Other static methods that are used to create an ExecutorService are newCachedThreadPool(), newFixedThreadPool(), newScheduledThreadPool(). For more details on this, please refer here

Ensure you shut down the executor service as in line#17. If you don’t shut down, the application stays running even when all the tasks are completed. The shutdown() initiates a gradual shutdown of the executor service. Existing tasks continue to execute until completion. No new task submission is accepted. What happens when a task is submitted after the shutdown() is initiated? It throws RejectedExecutionException. Please try the following example.

ExecutorServiceEx2.java

public class ExecutorServiceEx2 {
    
    public static void main(String[] args){
        ExecutorServiceEx2 app = new ExecutorServiceEx2();
        app.execute();
    }
    
    public void execute()
    {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        System.out.println("execute() @ Main Thread starts " + Thread.currentThread().getName()+"\n");
        //submitting a runnable
        executor.execute(new Task());
        executor.shutdown();
        executor.execute(new Task());
        
        System.out.println("Main Thread shut down the executor " + Thread.currentThread().getName()+"\n");
    }
    
    class Task implements Runnable{
        @Override
        public void run(){
            try{
            System.out.println("Task thread starts:" + Thread.currentThread().getName()+"\n");
            Thread.sleep(5000); 
            } catch(InterruptedException iex){
                iex.printStackTrace();
            }
            System.out.println("Task thread ends:" + Thread.currentThread().getName()+"\n");
        }
    }
}

In the above example, at Line#14, executor service is shut down and at Line#15, a task is submitted to the service. At Line#15, the executor refuses to accept the task and it throws RejectedExecutionException.

Another variant of shutdown() is shutdownNow(). The shutdownNow() attempts to stop all the active tasks, halts the processing of waiting tasks and returns a list of tasks that were awaiting execution.

5. Callable & Future

java.util.concurrent.Callable<V> is an interface that represents an asynchronous task. The computation to be performed is defined in the call() method. Unlike run() in java.lang.Runnable, a Callable can return a value and throw Exceptions. The value returned by the call()is wrapped in a java.util.concurrent.Future<V> Object where V is the type returned by the call() method.

Future can also be viewed as a holder object. It represents the future result of a long-running computation. It acts as a holder of the result of the computation. If the computation returns a type, say Integer, the result would be of type Future<Integer>. The Future provides isDone() method to check whether the computation is completed and get() method to retrieve the result of the computation.

The Callable can not be passed to a Thread constructor for execution. It has to be passed on to an Executor via submit() method. For more details, please refer to the concurrency-callable article.

Following is an example that searches a given String in all the files in a given folder.

SearchOperation.java

public class SearchOperation implements Callable
{
     File fileToSearch;
     String searchString;
     
    public SearchOperation(File fileName, String searchString){
      this.fileToSearch = fileName;
      this.searchString = searchString;
    }

    @Override
    public SearchResult call() throws Exception {
        System.out.println("inside call : " + fileToSearch);
        String content = new String(Files.readAllBytes(fileToSearch.toPath()));
        SearchResult result = new SearchResult();
        result.setFileName(fileToSearch.getAbsolutePath());
        if (content.contains(searchString)) {
            result.setIsFound(true);
        } else {
            result.setIsFound(false);
        }
        
        return result;
    }
}

SearchResult.java

public class SearchResult {

    private boolean isFound;
    private String fileName;
  
    public boolean isIsFound() {
        return isFound;
    }

    public void setIsFound(boolean isFound) {
        this.isFound = isFound;
    }

    public void setFileName(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public String toString() {
        return "SearchResult{" + "isFound=" + isFound + ", fileName=" + fileName + '}';
    }
    
}

SearchTextExample.java

public class SearchTextExample {
    
    public static void main(String[] args){
        try{
        SearchTextExample app = new SearchTextExample();
        app.executeSearch();
        } catch (Exception ex){
            ex.printStackTrace();
        }
    }
    
    public void executeSearch() throws Exception 
    {
        String searchKey = "monster";
        File searchFolder = new File("testdata");
        
        if(searchFolder.isDirectory()){
            System.out.println("this is a directory" + searchFolder.getAbsolutePath());
        }
        
        File[] files = searchFolder.listFiles();
        List<Future> taskResultList = new ArrayList<>();
        
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for(int i=0; i< files.length; i++) {
            System.out.println("File : "+ files[i].getAbsolutePath());
            taskResultList.add(executorService.submit(new SearchOperation(files[i],searchKey)));
                    
        }
        
        TimeUnit.SECONDS.sleep(5);
        //Iterate Futures
        ListIterator iterator = taskResultList.listIterator();
        while(iterator.hasNext()){
            Future<SearchResult> futureElement = iterator.next();
            if(futureElement.isDone()){
                System.out.println("Future element is done");
                System.out.println(futureElement.get());
            } else {
                System.out.println("Future element is not done");
            }
            
        executorService.shutdown();
        
    }
 
}
}

In SearchTextExample displayed above, at Line#24, an ExecutorService is created with a fixed-size thread-pool with 3 threads. The task submitted to the executor service is the Callable SearchOperation at Line#27. SearchOperation returns a POJO SearchResult. When the search is complete, it returns SearchResult which is wrapped as a Future. At, Line#34, the list of Futures returned is iterated. If the isDone() of Future returns true, then it means that the execution is completed. the get() method on Line#38 returns the SearchResult object.

6. FutureTask

java.util.concurrent.FutureTask<V> is an implementation of java.util.concurrent.RunnableFuture<V>, which in turn is a sub-interface that extends java.util.concurrent.Future<V> and java.lang.Runnable interfaces.

As the interfaces suggest, a FutureTask can be used to wrap a Callable or Runnable. It can either be run as a standalone Thread or via Executor. For more details on FutureTask, please refer here

FutureTaskExample1

public class FutureTaskExample1 {
    
    public static void main(String[] args){
        FutureTaskExample1 app = new FutureTaskExample1();
        app.demo();
   }
    
   public void demo(){
    
       //Method local inner class
       class DemoCallable implements Callable{
        
        @Override
        public Integer call() throws Exception {
            System.out.println("call() execution " + Thread.currentThread().getName());
            return 0;
        }
    }
     
       //execution code for demo method
       System.out.println("demo() execution " + Thread.currentThread().getName());
       ExecutorService executor = Executors.newSingleThreadExecutor();
       FutureTask<?> futureTask = new FutureTask<Integer>(new DemoCallable());
       executor.execute(futureTask);
       executor.shutdown();
   }
    
}

In the above example, DemoCallable is an inner class defined at Line#11. At Line#23, a new FutureTask is created, wrapping the Callable. This futureTask is passed to execute() method of the ExecutorService at Line#24. Recollect the method signature of execute() inherited from the interface java.util.concurrent.Executor. void execute(Runnable command). FutureTask, though wraps a Callable, is also a Runnable.

FutureTaskExample2.java

FutureTask<?> futureTask = new FutureTask<Integer>(new DemoCallable());
new Thread(futureTask).start();

In the previous example, an ExecutorService is used to execute the FutureTask. In this example, the DemoCallable is passed to the constructor Thread(Runnable target).

FutureTaskExample3.java

public class FutureTaskExample3 {
    
    public static void main(String[] args){
        FutureTaskExample3 app = new FutureTaskExample3();
       try{
        app.demo();
       } catch (Exception ex){
           ex.printStackTrace();
       }
   }
    
   public void demo() throws Exception
   {
      //Method local inner class
       class DemoRunnable implements Runnable{
        
        @Override
        public void run() {
            System.out.println("run() execution " + Thread.currentThread().getName());
            
        }
    }
     
       //execution code for demo method
       System.out.println("demo() execution " + Thread.currentThread().getName());
       ExecutorService executor = Executors.newSingleThreadExecutor();
       FutureTask<String> futureTask = new FutureTask<>(new DemoRunnable(),"Complete Message");
       executor.execute(futureTask);
       System.out.println(futureTask.get());
       executor.shutdown();
   }
    
}

OUTPUT

demo() execution main
run() execution pool-1-thread-1
Complete Message

In the above example, Line#15 defines DemoRunnable as an inner class. Line#26 creates a new ExecutorService that pools a single thread. In Line#27, FutureTask is created, wrapping the DemoRunnable. The String "Complete Message" will be passed on to the Future when the run() of DemoRunnable is complete. At Line#29, the get() on futureTask returns the String "Complete Message"

7. CompletableFuture

The java.util.concurrent.CompletableFuture<T> was introduced in JDK 8. It implements 2 interfaces,java.util.concurrent.Future and java.util.concurrent.CompletionStage.

CompletableFuture provides methods for creating, chaining and combining multiple Futures. It defines the contract for an asynchronous computation step that can be combined with other steps. This is quite a large API and has a lot more. Here, we shall see some examples that get us started.

Let us see a few examples to understand it better.

7.1 CompletableFuture – with Runnable

CompletableExample.java

public class CompletableExample {
    
    public static void main(String[] args){
        System.out.println("Initiating Main:  " + Thread.currentThread().getName());
        
        CompletableFuture completable
         = CompletableFuture.runAsync(new Runnable(){
            
            @Override
            public void run(){
                System.out.println("Start of runnable " + Thread.currentThread().getName());
                try{
                    TimeUnit.SECONDS.sleep(5);
                } catch(InterruptedException iex){
                    iex.printStackTrace();
                }
                System.out.println("End of runnable " + Thread.currentThread().getName());
            }
        });
        
        System.out.println("Completing Main:  " + Thread.currentThread().getName());
    }
}

OUTPUT

Initiating Main:  main
Completing Main:  main
Start of runnable ForkJoinPool.commonPool-worker-1

At Line#6, #7, a new Runnable is created and executed via runAsync() method. A Runnable is passed to execute() method of ExecutorService for execution. In CompletableFuture, the Runnable is passed to runAsync() method for execution. Please note the output. The thread name is ForkJoinPool.commonPool-worker-1. ForkJoinPool#commonpool() is a static thread pool that is lazily initiated when it is actually needed by the JDK. The CompletableFuture uses this thread pool to execute the task if an executor service is not passed to it.

7.2 CompletableFuture – with Supplier

CompletableExample1.java

public class CompletableExample1 {
    
    public static void main(String[] args){
        System.out.println("Initiating Main:  " + Thread.currentThread().getName());
        
        CompletableFuture completable
         = CompletableFuture.supplyAsync(new SampleSupplier());
        try{        
        String result = completable.get(2,TimeUnit.SECONDS);
        }catch(Exception ex){
            
            System.out.println("Exception " + ex);
            completable.cancel(true);      
        }
        System.out.println("Completing Main:  " + Thread.currentThread().getName());
    }   
}

SampleSupplier.java

public class SampleSupplier implements Supplier
{

    @Override
    public String get(){
                System.out.println("Start of SampleSupplier " + Thread.currentThread().getName());
                try{
                    TimeUnit.SECONDS.sleep(5);
                } catch(InterruptedException iex){
                    iex.printStackTrace();
                }
                System.out.println("End of SampleSupplier " + Thread.currentThread().getName());
                return "completed";
            }

}

OUTPUT

Initiating Main:  main
Start of SampleSupplier ForkJoinPool.commonPool-worker-1
Exception java.util.concurrent.TimeoutException
Completing Main:  main

In CompletableExample1, Line#6,7 a Supplier is passed for execution to the method supplyAsync(). SampleSupplier is an implementation of the interface java.util.function.Supplier and it holds the computation code. A Runnable is executed using runAsync() method. A Supplier is executed using supplyAsync() method.

The SampleSupplier returns a String. The String object is wrapped in the CompletableFuture, similar to Future. On line#9, the get() of CompletableFuture returns the String returned by the SampleSupplier. The get() method waits for 2 seconds. Since the SampleSupplier has a computation that doesn’t complete in 2 seconds, the main code times out and throws a TimeoutException.

7.3 CompletableFuture – with Supplier & blocking get()

CompletableExample2.java

public class CompletableExample2 {
    
    public static void main(String[] args){
        System.out.println("Initiating Main:  " + Thread.currentThread().getName());
        String result = null;
        CompletableFuture completable
         = CompletableFuture.supplyAsync(new SampleSupplier());
        try{        
          result = completable.get();
        }catch(Exception ex){
            System.out.println("Exception " + ex);
            completable.cancel(true);
        }
        System.out.println("Result is " + result);
        System.out.println("Completing Main:  " + Thread.currentThread().getName());
    }   
}

OUTPUT

Initiating Main:  main
Start of SampleSupplier ForkJoinPool.commonPool-worker-1
End of SampleSupplier ForkJoinPool.commonPool-worker-1
Result is completed
Completing Main:  main

This example is also similar to the previous section. This is to demonstrate that the get() method of CompletableFuture is a blocking method as CompletablFuture is also a Future. The main method completes only after the completion of get() method.

7.4 CompletableFuture – with Executor

CompletableExample4.java

public class CompletableExample4 {

    public static void main(String[] args){
        
        CompletableExample4 app = new CompletableExample4();
        try {
        app.test();
        } catch(Exception ex){
            ex.printStackTrace();
        }
    }
    
    private void test() throws Exception
    {
        ExecutorService execService = Executors.newSingleThreadExecutor();
        
        CompletableFuture cf1 = CompletableFuture.supplyAsync(new SampleSupplier(), execService);
        CompletableFuture cf2 = cf1.thenAcceptAsync(this::postProcess, execService);
        cf2.thenRun(() -> {
           System.out.println("Last phase of computation " + Thread.currentThread().getName());
        });
        while (!cf2.isDone()) {
            TimeUnit.SECONDS.sleep(2);
            if (cf2.isDone()){
                execService.shutdown();
            }
        }
    }
    
    public void postProcess(Object result){
        System.out.println("Result is " + result + " / " + Thread.currentThread().getName());
    }
}

OUTPUT

Start of SampleSupplier pool-1-thread-1
End of SampleSupplier pool-1-thread-1
Result is completed / pool-1-thread-1
Last phase of computation pool-1-thread-1

In this example, an ExecutorService is one of the parameters passed to the CompletableFuture. So, the CompletableFuture now uses the thread pool initiated by the ExecutorService instead of ForkJoinPool#commonpool().

Here, at Line#17, cf1 forms the 1st phase of computation. The output of this phase is fed to cf2 for 2nd phase on Line#18. The output of cf2 is passed on to Line#19 to thenRun() method. This forms the 3rd phase of computation.

The method thenAcceptAsync() on Line#18, accepts a Consumer to process the returned result.

Java Threads - phases of processing CompletableFuture
CompletableExample4 – phases of processing CompletableFuture

8. ScheduledFuture

Methods of ScheduledExecutorService, such as schedule(), scheduleAtFixedRate() and scheduleWithInitialDelay() return ScheduledFuture<V>

CallableForScheduling.java

public class CallableForScheduling implements Callable 
{
    Integer callID;
    
    public CallableForScheduling(int callID){
        this.callID = callID;
    }
    
    @Override
    public Integer call() throws Exception {
        String threadName = Thread.currentThread().getName();
        System.out.println("Initiating call() @ " + threadName+ " @ "  + LocalTime.now());
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Completing call() @ " + threadName + " @ " +LocalTime.now());
        return callID;
    }

}

ScheduledExample1.java

public class ScheduledExample1 {
    public static void main(String[] args){
        ScheduledExample1 app = new ScheduledExample1();
        try {
        app.test();
        } catch(Exception ex){
            ex.printStackTrace();
        }
    }
    
    public void test() throws Exception
    {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> schedFuture = executor.schedule(new CallableForScheduling(5), 2, TimeUnit.SECONDS);
        
        System.out.println(schedFuture.get());
        executor.shutdown();
        
    }

}

In the above example, the callable CallableForScheduling is the task that needs to be scheduled. At Line#14 of the test(), the callable is passed to the schedule() method of the ExecutorService. The Callable takes 5 as the parameter for callID. The second and 3rd parameters to the schedule() method denote the initial delay. Only after the initial delay is complete (i.e after 2 seconds in this example), the Callable task is executed.

9. Download the Source Code

Download
You can download the full source code of this example here: Creating and Starting Java Threads

hashtagmercury

The author is a professional with experience in development projects ranging from Java to Mainframes and loves exploring new technologies and in keeping her knowledge on par with the trend.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button