I have an application where I need to ingest data into ES. There are multiple rows so I am using ThreadPoolTaskExecutor
to parallelize the ingest of data in batches of 50. Also, I am getting this data in my consumer and I do not want to ack before I have processed all the records so I have encapsulated the task in a Runnable
class and I adding them to a list of callables and using invokeAll() when the list has all the data.
maxPoolSize of my ThreadPoolTaskExecutor
is 200
and corePoolSize
is 50
. So, I was expecting the application to use 200
threads at one time as by my understanding ThreadPoolTaskExecutor
uses threads upto to corePoolSize
when queued up tasks are less than the corePoolSize
and later it starts adding threads upto the maxPoolSize
and executes the tasks. When, I checked in my logs and visualVm I found out that only 50
threads were executing, So, I increased the corePoolSize
to 70
and found out that 70
threads were executing. Where as queue size should be more than 3000 as the list is more than 3000 thousand. Sample code for the issue.
public void Test() throws InterruptedException {
List<Callable<Object>> tagList = new ArrayList<Callable<Object>>(1000);
for(int i=0; i<400; i++){
tagList.add(Executors.callable(new Runnable() {
@Override
public void run() {
System.out.println("Thread "+ Thread.currentThread());
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}));
}
ThreadPoolTaskExecutor.getThreadPoolExecutor().invokeAll(tagList);
}
INFO | 2021-02-14 00:06:32.086+05:30 | Application | 134 | current thread count: 50 total thread count: 200
INFO | 2021-02-14 00:06:33.085+05:30 | Application | 134 | current thread count: 50 total thread count: 200
Please can someone explain if I am missing something.
maxPoolSize of my ThreadPoolTaskExecutor is 200 and corePoolSize is 50.So,I was expecting the application to use 200 threads at one time as by my understanding ThreadPoolTaskExecutor uses threads upto to corePoolSize when queued up tasks are less than the corePoolSize and later it starts adding threads upto the maxPoolSize and executes the tasks.
That is not quite right. As it is the case with ThreadPoolExecutor:
When a new task is submitted (..), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.
Hence, the reason why you have only 50 threads running even though the maxPoolSize is 200
.
From ThreadPoolTaskExecutor one can read:
The default configuration is a core pool size of 1, with unlimited max pool size and unlimited queue capacity. This is roughly equivalent to Executors.newSingleThreadExecutor(), sharing a single thread for all tasks. Setting "queueCapacity" to 0 mimics Executors.newCachedThreadPool(), with immediate scaling of threads in the pool to a potentially very high number. Consider also setting a "maxPoolSize" at that point, as well as possibly a higher "corePoolSize" (see also the "allowCoreThreadTimeOut" mode of scaling).
It looks like you have to set the queueCapacity
to zero in order to meet your requirements.