Search code examples
javamultithreadingspring-bootthreadpoolexecutor

Why are the number of threads used are higher than required?


I have an SpringBoot application where I have allowed at most 45 concurrent requests. Now, 1 request in its journey calls 16 external services in parallel using threadPool A. So keeping the average case and worst case in mind, I kept following configurations for it :

ThreadPoolTaskExecutor A = new ThreadPoolTaskExecutor();
A.setCorePoolSize(400);
A.setMaxPoolSize(1000);
A.setQueueCapacity(10);
A.setThreadNamePrefix("async-executor");
A.initialize();

My expectations here were that at most 45*16 = 720 threads will be used. But on running load test, I observe that threads kept on getting open (checked in thread dump), and after few minutes it started giving RejectedExecutionException.

RejectedExecutionException
Task ServiceX rejected from org.springframework.scheduling.concurrent.
ThreadPoolTaskExecutor$1@4221a19e[Running, pool
size = 1000, active threads = 2, queued tasks = 10, completed tasks = 625216]

Most of the threads as shown in thread dump

"executor-A-57" #579 prio=5 os_prio=0 tid=0x000000000193f800 nid=0x2e95 waiting on condition [0x00007fa9e820c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000582dadf90> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
    - None

I wanted to know what I am missing here? Why am I getting rejection?

Edit : I tried to replicate similar thing on a short piece of code, here it goes :

A MainClass runs an long loop. Inside each loop it calls a service1 3 times. For now I have demo service which just have same code Thread.sleep(100) inside them.

MainClass.java

package com.flappy.everything.threadpooling;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class MainClass {

    private static ThreadPoolTaskExecutor getExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setMaxPoolSize(4);
        threadPoolTaskExecutor.setThreadNamePrefix("async-exec");
        threadPoolTaskExecutor.setCorePoolSize(4);
        threadPoolTaskExecutor.setQueueCapacity(2);
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolTaskExecutor outerExecutor = getExecutor();
        List<Service1> services = Arrays.asList(new Service1(), new Service1(), new Service1());
        for (int i = 0; i < 1000000; i++) {
            List<Future> futures = new ArrayList<>();
            for (Service1 service : services) {
                futures.add(outerExecutor.submit(() -> {
                    try {
                        service.set();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }));
            }
            for (Future future : futures) {
                future.get();
            }
        }
    }
}

Service1.java

package com.oyorooms.everything.threadpooling;

import org.springframework.scheduling.annotation.Async;

public class Service1 {
    public void set() throws InterruptedException {
        Thread.sleep(100);
        System.out.println(Thread.currentThread().getName());
    }
}

So ideally only 3 threads should have been open for the threadPool I gave but still, I get Rejection on running the code.


Solution

  • This was an interesting one.

    The reason the code you listed is failing is because the time it takes to transfer an element from the work queue to the worker thread is slower than the time it takes for the main thread to put the items on the queue.

    The flow goes like this:

    if(there are active threads and is there availability on the queue){
        submit to the work queue for the worker threads to pick up // 1
    } else {
       if(max pool size is not met){
          create a new thread with this task being its first task // 2
       } else { 
          reject // 3
       }
    } 
    

    What you are seeing is the code hitting // 3.

    When you first submit your tasks, the number of threads will be less than the max pool size. The first round of tasks being submitted is going to get to // 2.

    After the first iteration, the number of active threads will be the max pool size and the code will be trying to submit to // 1.

    Let's say that the main thread puts 3 items to the queue very very quickly so that the 4 threads in the ThreadPool aren't able to pull one off fast enough. If that happens, we will pass the first if statement (since there is no availability on the queue) and get to the else. Since the max pool size has already been met, then there is nothing else to do but reject.

    This can further be explained by inspecting the ThreadPoolExecutor Javadocs.

    If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

    and later

    Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.

    To resolve your issue, you have two reasonable choices:

    1. Use a SynchronousQueue. A thread offering to a SynchronousQueue will wait indefinitely, until another thread takes the item, if it knows another thread is waiting to receive it. The fixed queue size you define will cause the main thread to return (without blocking) if a put does not succeed (ie, another thread doesn't immediately take it off). To use a SynchronousQueue using Spring, set the queue capacity to zero. setQueueCapacity(0). Also from the Javadocs

      A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them.

    2. Set the queue size to be greater than or equal to the number of concurrent tasks you expect to be submitted. The size of the queue will likely not get to that size in general, but it will protect you in the future.