Search code examples
javamultithreadingthreadpoolthreadpoolexecutor

Why can't I add tasks to the thread pool the second time in Java?


I create a thread pool to deal with the task, after deal with the task,I find I can not add and start the other task? How to fix it? If I change the executor by executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("timeOutThread")); ,It will run OK.But if the task is canceled because of timeout,do this will cause memory leak?

ExecutorService executor =   new ThreadPoolExecutor(3,
                    3, 0L,
                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1),
                    new NamedThreadFactory(
                            "timeOutThread")); 
    List<Callable<String>> callableList = new ArrayList<>();
    IntStream.range(0, 3).forEach(index -> {
        callableList.add(() -> request(index));
    });
    List<Future<String>> futureList = executor.invokeAll(callableList, 1, TimeUnit.SECONDS); 
        for (int i = 0; i < futureList.size(); i++) {
            Future<String> future = futureList.get(i);
            try {
                list.add(future.get());
            } catch (CancellationException e) {
                log.info("timeOut task:{}", i);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
          Thread.sleep(1000);
    callableList.clear();
    IntStream.range(0, 3).forEach(index -> {
        callableList.add(() -> request(index));
    });
      long start1 = System.currentTimeMillis();
     // Task java.util.concurrent.FutureTask@5fdcaa40 rejected from java.util.concurrent.ThreadPoolExecutor@6dc17b83   
    List<Future<String>> futureList   = executor.invokeAll(callableList, 1, TimeUnit.SECONDS); 
    for (int i = 0; i < futureList.size(); i++) {
        Future<String> future = futureList.get(i);
        try {
            list.add(future.get());
        } catch (CancellationException e) {
            log.info("timeOut Task:{}", i);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

  public String request() throws InterruptedException { 
    TimeUnit.MILLISECONDS.sleep(200000);
    return "A";
  }

Solution

  • I can reproduce your error with the following simplified code:

    import java.util.ArrayList;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
      public static void main(String[] args) throws InterruptedException {
        var pool = new ThreadPoolExecutor(
            3, 3, 0L, TimeUnit.NANOSECONDS, new LinkedBlockingQueue<>(1));
        try {
          System.out.println("Executing first batch of tasks...");
          submitTasks(pool);
    
          System.out.println("Executing second batch of tasks...");
          submitTasks(pool);
        } finally {
          pool.shutdown();
        }
      }
    
      private static void submitTasks(ExecutorService executor) throws InterruptedException {
        var tasks = new ArrayList<Callable<Void>>(3);
        for (int i = 0; i < 3; i++) {
          tasks.add(() -> {
            Thread.sleep(2_000L);
            return null;
          });
        }
        executor.invokeAll(tasks);
      }
    }
    

    Which gives this output:

    Executing first batch of tasks...
    Executing second batch of tasks...
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@87aac27[Not completed, task = Main$$Lambda$1/0x0000000800c009f0@816f27d] rejected from java.util.concurrent.ThreadPoolExecutor@3e3abc88[Running, pool size = 3, active threads = 0, queued tasks = 1, completed tasks = 3]
            at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2070)
            at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
            at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
            at java.base/java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:247)
            at Main.submitTasks(Main.java:32)
            at Main.main(Main.java:18)
    

    The problem is caused by the queue being too small. The LinkedBlockingQueue is created with a capacity of only one, but three tasks are submitted to the pool at once. So, the question becomes, why does it only fail on the second call to invokeAll?

    The reason has to do with how ThreadPoolExecutor is implemented. When an instance is first created, none of the core threads are started. They are started lazily as tasks are submitted. When the submission of a task results in a thread being started, that task is immediately given to the thread. The queue is bypassed. So, when invokeAll is called the first time, each of the three core threads is started and none of the tasks go into the queue.

    But the second time invokeAll is called, the core threads have already been started. Since submitting the tasks does not result in a thread being created, the tasks are put into the queue. But the queue is too small, resulting in the RejectedExecutionException. If you're wondering why the core threads are still alive despite the keep-alive time being set to zero, that's because core threads are not allowed to die due to timeout by default (you have to explicitly configure the pool to allow that).

    You can see this lazily-started-core-threads is the cause of the problem by modifying the code slightly. Simply adding:

    pool.prestartAllCoreThreads();
    

    Just after creating the pool causes the first call to invokeAll to now fail with a RejectedExecutionException.

    Also, if you change the queue's capacity from one to three, then the RejectedExecutionException will no longer occur.


    Here's some relevant documentation:

    Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:

    • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
    • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
    • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.