June 28, 2022

Java Executor Tutorial - Executor, ExecutorService, ScheduledExecutorService

In this Java executor tutorial you’ll learn how to use Executor, ExecutorService, ScheduledExecutorService and their thread pool implementations to effectively manage threads in a large scale application.

Java Executor API

While using Java multi-threading for creating thread, there is a close connection between the task being done by a new thread, as defined by its Runnable object, and the thread itself. This way of managing threads may not work well with large scale applications. In large scale applications it is better to separate the thread creation and thread management from the business logic. Java Executor framework helps in doing that by encapsulating the thread creation and management functionality in objects known as executors. Java Executor framework is categorized into following three parts-

  1. Executor interfaces- There are three interfaces Executor, ExecutorService and ScheduledExecutorService that define the three executor object types.
  2. Thread Pools- These are the executor implementation classes like ThreadPoolExecutor and ScheduledThreadPoolExecutor that execute each submitted task using one of the thread from thread pools.
  3. Fork/Join framework- It is an implementation of the ExecutorService interface that helps you take advantage of multiple processors.

Java Executor interface

An object of type Executor executes submitted Runnable tasks. By using Executor you don't need to explicitly create thread.

For example if there is a Runnable object runnable then you can replace

(new Thread(runnable)).start();
with
executor.execute(runnable);

where executor is an Executor object.

Java Executor interface has a single method execute which is defined as follows-

void execute(Runnable command)- Executes the given runnable at some time in the future. Passed runnable object may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation.

Java ExecutorService interface

ExecutorService interface extends Executor and adds functionality for shutting down the executor and the functionality to return a Future after executing a task.

Apart from the base method execute (inherited from Executor interface), ExecutorService has more versatile submit() method which is overloaded to accept Runnable objects as well as Callable objects, which allow the task to return a value.

Submit methods in ExecutorService
  • <T> Future<T> submit(Callable<T> task)- Submits a value-returning task for execution and returns a Future representing the pending results of the task.
  • Future<?> submit(Runnable task)- Submits a Runnable task for execution and returns a Future representing that task. The Future's get method will return null upon successful completion.
  • <T> Future<T> submit(Runnable task, T result)- Submits a Runnable task for execution and returns a Future representing that task. The Future's get method will return the given result upon successful completion.
Shutdown methods in ExecutorService

You can shut down an ExecutorService which will cause it to reject new tasks.

  • void shutdown()- Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
  • List<Runnable> shutdownNow()- Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.

Java ScheduledExecutorService interface

ScheduledExecutorService interface extends ExecutorService interface and adds functionality to schedule commands to run after a given delay, or to execute periodically.

Methods for scheduling in ScheduledExecutorService interface
  • schedule(Callable<V> callable, long delay, TimeUnit unit)- Creates and executes a ScheduledFuture that becomes enabled after the given delay.
  • schedule(Runnable command, long delay, TimeUnit unit)- Creates and executes a one-shot action that becomes enabled after the given delay.
  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)- Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period.
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)- Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next.

Java Executor implementation classes

Now we know about the executor interfaces and the methods defined in these interfaces. Java Executor framework also has predefined executor classes that implement these interfaces.

  • ThreadPoolExecutor- This class implements Executor and ExecutorService interfaces. ThreadPoolExecutor executes each submitted task using one of possibly several pooled threads.
  • ScheduledThreadPoolExecutor- This class extends ThreadPoolExecutor and implements ScheduledExecutorService. ScheduledThreadPoolExecutor class schedule commands to run after a given delay, or to execute periodically.
  • ForkJoinPool- This class is an implementation of Executor and ExecutorService interfaces. ForkJoinPool class is used in Fork/Join framework for running ForkJoinTasks.

To read more about ThreadPoolExecutor class in Java please refer this post- Java ThreadPoolExecutor - Thread Pool with ExecutorService

To read more about ScheduledThreadPoolExecutor class in Java please refer this post- Java ScheduledThreadPoolExecutor - Scheduling With ExecutorService

Most of the executor implementations in java.util.concurrent use thread pools, which consist of worker threads. Advantages you get by using thread pool is-

  1. Pooled thread exists separately from the Runnable and Callable tasks it executes and is often used to execute multiple tasks.
  2. Thread objects use a significant amount of memory. In a large-scale application if each task uses its own thread then allocating and deallocating many thread objects creates a significant memory management overhead. Using pooled threads minimizes the overhead due to thread creation.

Creating executors using Executors class

Before going into examples for Executor and ExecutorService you must know about one more class; Executors class in Java concurrent API.

Rather than creating and using instances of ThreadPoolExecutor and ScheduledThreadPoolExecutor directly you can use static factory methods provided by the Executors class to get an executor. These factory methods can create and return an ExecutorService, ScheduledExecutorService set up with commonly useful configuration settings.

Following is the list of most commonly used factory methods-

  • static ExecutorService newCachedThreadPool()- Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
  • static ExecutorService newFixedThreadPool(int nThreads)- Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks.
  • static ExecutorService newSingleThreadExecutor()- Creates an Executor that uses a single worker thread operating off an unbounded queue
  • static ScheduledExecutorService newSingleThreadScheduledExecutor()- Creates a single-threaded executor that can schedule commands to run after a given delay, or to execute periodically.
  • static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)- Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.

Java ExecutorService example

1- In this example an ExecutorService is created using the newFixedThreadPool() method of the Executors class. Thread pool is created with 2 threads so these 2 threads will be used to execute submitted tasks.

public class ExecutorExp {
  public static void main(String[] args) {
    // creating executor with pool of 2 threads
    ExecutorService executor = Executors.newFixedThreadPool(2);
    // running 4 tasks using pool of 2 threads
    executor.execute(new Task());
    executor.execute(new Task());
    executor.execute(new Task());
    executor.execute(new Task());
    executor.shutdown();
  }
}
class Task implements Runnable{
  @Override
  public void run() {
    System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName());
    try {
      TimeUnit.MILLISECONDS.sleep(500);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }		
  }
}
Output
Executing task (Thread name)- pool-1-thread-2
Executing task (Thread name)- pool-1-thread-1
Executing task (Thread name)- pool-1-thread-2
Executing task (Thread name)- pool-1-thread-1

As you can see 4 tasks are executed using the 2 threads from the pool.

2- In this Java ExecutorService example submit method of the ExecutorService is used to run a runnable task.

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ExecutorExp {
  public static void main(String[] args) {
    // creating executor with pool of 2 threads
    ExecutorService executor = Executors.newFixedThreadPool(2);
    // running 4 tasks using pool of 2 threads
    Future<?> f1 = executor.submit(new Task());
    Future<?> f2 = executor.submit(new Task());
    Future<?> f3 = executor.submit(new Task());
    Future<?> f4 = executor.submit(new Task());
    try {
      System.out.println("f1- " + f1.get());
      System.out.println("f2- " + f2.get());
      if(f3.get() == null) {
        System.out.println("submitted task executed successfully");
      }
    } catch (InterruptedException | ExecutionException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    executor.shutdown();
  }
}
class Task implements Runnable{
  @Override
  public void run() {
    System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName());
    try {
      TimeUnit.MILLISECONDS.sleep(500);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    
  }
}
Output
Executing task (Thread name)- pool-1-thread-2
Executing task (Thread name)- pool-1-thread-1
Executing task (Thread name)- pool-1-thread-2
Executing task (Thread name)- pool-1-thread-1
f1- null
f2- null
submitted task executed successfully

As you can see for runnable task Future’s get() method returns null upon successful completion of the task.

3- In this example submit method of the ExecutorService is used to run a callable task. There are 2 classes implementing Callable and submit method is used to run those callable tasks. Later value returned from Callable is displayed.

public class ExecutorExp {
  public static void main(String[] args) {
    // creating executor with pool of 2 threads
    ExecutorService executor = Executors.newFixedThreadPool(2);
    // running 4 tasks using pool of 2 threads
    Future<String> f1 = executor.submit(new Task1());
    Future<String> f2 = executor.submit(new Task1());
    Future<String> f3 = executor.submit(new Task2());
    Future<String> f4 = executor.submit(new Task2());
    try {
      System.out.println("f1- " + f1.get());
      System.out.println("f2- " + f2.get());
      System.out.println("f3- " + f3.get());
      System.out.println("f4- " + f4.get());
    } catch (InterruptedException | ExecutionException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }    
    executor.shutdown();
  }
}
class Task1 implements Callable<String>{
  @Override
  public String call() throws Exception {
    System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName());
    try {
      TimeUnit.MILLISECONDS.sleep(500);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    return "In Task1";
  }
}

class Task2 implements Callable<String>{
  @Override
  public String call() throws Exception {
    System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName());
    try {
      TimeUnit.MILLISECONDS.sleep(500);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    return "In Task2";
  }
}
Output
Executing task (Thread name)- pool-1-thread-1
Executing task (Thread name)- pool-1-thread-2
f1- In Task1
Executing task (Thread name)- pool-1-thread-1
f2- In Task1
Executing task (Thread name)- pool-1-thread-2
f3- In Task2
f4- In Task2

Java ScheduledExecutorService example

In this example a ScheduledExecutorService is created using the newScheduledThreadPool() method of the Executors class. A callable task is scheduled to execute after a delay of 3 seconds.

public class ExecutorExp {
  public static void main(String[] args) {
    ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
    // Callable implementation
    Callable<String> c = ()->{
      System.out.println("Executed at- " + new Date());
      return "Executing task";
    };
    System.out.println("Time before execution started- " + new Date());
    // scheduling tasks with callable as param to be
    // executed after a delay of 3 Secs
    ScheduledFuture<String> sf = scheduledExecutor.schedule(c, 3, TimeUnit.SECONDS); 
    try {
      System.out.println("Value- " + sf.get());
    } catch (InterruptedException | ExecutionException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    scheduledExecutor.shutdown();
  }
}
Output
Time before execution started- Fri Jan 04 10:25:14 IST 2019
Executed at- Fri Jan 04 10:25:17 IST 2019
Value- Executing task

ExecutorService shutdown example

In the previous examples shutdown() method was used to terminate the executor. Since shutdown() method ensures that previously submitted tasks are executed before the shutdown so there was no problem. But there is also a shutdownNow() method which does not wait for actively executing tasks to terminate. Let’s see it with an example.

public class ExecutorExp {
  public static void main(String[] args) {
    // creating executor with pool of 2 threads
    ExecutorService executor = Executors.newFixedThreadPool(2);
    // running 4 tasks using pool of 2 threads
    Future<?> f1 = executor.submit(new Task());
    Future<?> f2 = executor.submit(new Task());
    Future<?> f3 = executor.submit(new Task());
    Future<?> f4 = executor.submit(new Task());
    System.out.println("shutting down instantly");
    executor.shutdownNow();
  }
}
class Task implements Runnable{
  @Override
  public void run() {
    System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName());
    try {
      TimeUnit.MILLISECONDS.sleep(1000);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }		
  }
}
Output
java.lang.InterruptedException: sleep interrupted
	at java.base/java.lang.Thread.sleep(Native Method)
	at java.base/java.lang.Thread.sleep(Thread.java:340)
	at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:403)
	at com.knpcode.Task.run(ExecutorExp.java:46)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:844)
java.lang.InterruptedException: sleep interrupted
	at java.base/java.lang.Thread.sleep(Native Method)
	at java.base/java.lang.Thread.sleep(Thread.java:340)
	at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:403)
	at com.knpcode.Task.run(ExecutorExp.java:46)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:844)
shutting down instantly
Executing task (Thread name)- pool-1-thread-1
Executing task (Thread name)- pool-1-thread-2

As you can see here the shutdown is instant. Since sleep method was called upon the thread so that is interrupted in order to shutdown, that is why InterruptedException is thrown.

Recommendation as per Java docs is to shut down ExecutorService in two phases.

First by calling shutdown to reject incoming tasks, and then calling shutdownNow(), if necessary, to cancel any lingering tasks. shutdownNow() should be called along with awaitTermination() method to give time for the executing task to finish. Next example shows this usage.

public class ExecutorExp {

  public static void main(String[] args) {
    // creating executor with pool of 2 threads
    ExecutorService executor = Executors.newFixedThreadPool(2);
    // running 4 tasks using pool of 2 threads
    Future<?> f1 = executor.submit(new Task());
    Future<?> f2 = executor.submit(new Task());
    Future<?> f3 = executor.submit(new Task());
    Future<?> f4 = executor.submit(new Task());
    System.out.println("shutting down instantly");
    //executor.shutdownNow();
    shutdownAndAwaitTermination(executor);
  }
  // For shutdown
  static void shutdownAndAwaitTermination(ExecutorService pool) {
    pool.shutdown(); // Disable new tasks from being submitted
    try {
      // Wait a while for existing tasks to terminate
      if (!pool.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
        pool.shutdownNow(); // Cancel currently executing tasks
        // Wait a while for tasks to respond to being cancelled
        if (!pool.awaitTermination(500, TimeUnit.MILLISECONDS))
          System.err.println("Pool did not terminate");
      }
    } catch (InterruptedException ie) {
     // Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
    }
  }
}
class Task implements Runnable{
  @Override
  public void run() {
    System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName());
    try {
      TimeUnit.MILLISECONDS.sleep(500);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }    
  }
}

That's all for the topic Java Executor Tutorial - Executor, ExecutorService, ScheduledExecutorService. If something is missing or you have something to share about the topic please write a comment.


You may also like

No comments:

Post a Comment