Search code examples
javamultithreadingexecutorservicehazelcastcompletion-service

Processing a large number of tasks with CompletionService


I need to process a large number (>100 million) of requests on a multi-core machine (each request is to process a row in a data file and involves some I/O with a remote system. Although the details do not matter much, the specific task is to load a distributed Hazelcast map from some data files). The execution will be handled through a ThreadPoolExecutor. One thread will read the file then submit the data to multiple independent threads to put it in the map. The machine has 32 cores, so there are enough available for parallel loading of the map.

Because of the large number of requests, the common approach of creating tasks and queueing them to the executor service is not feasible as the queued tasks will take up too much memory.

Which brings the ExecutorCompletionService. With it, a task would be submitted when a previous operation completed, which is known by calling take() (or poll(), as applicable). This will work fine when all the threads of the executor service are used. However, "loading up all the threads" is not done yet. There are two phases:

  • fill up the queue: while there are still unused threads in the pool, submit tasks to the ExecutorCompletionService and do not wait until submitting more

  • feed the queue: once the threads are all used, submit a task only once a previous task has finished. Thus, rows will be fed as quickly as possible, but not quicker and will not be queued.

The above can be coded, but I was wondering if the above logic is already implemented and I somehow missed it. I'm asking because it looks to be a common scenario.


Solution

  • You can specify a BlockingQueue implementation when you create a ThreadPoolExecutor. If all you're trying to avoid is creating excess Runnable objects, then you could use a bounded BlockingQueue, e.g. ArrayBlockingQueue have a single thread pushing items on to the queue which will be blocked while the queue is at capacity.