Search code examples
javamultithreadingproducer-consumerthreadpoolexecutor

Synchronous task producer/consumer using ThreadPoolExecutor


I'm trying to find a way to use a ThreadPoolExecutor in the following scenario:

  • I have a separate thread producing and submitting tasks on the thread pool
  • a task submission is synchronous and will block until the task can be started by the ThreadPoolExecutor
  • at any given time, only a fixed number of tasks can be executing in parallel. An unbounded number of tasks running at the same time may result in memory exhaustion.
  • before submitting a task, the producer thread always checks that some maximum build time has not been exceeded since the first submitted task. If it was exceeded, the thread shutdowns but any task currently running on the thread pool runs to completion before the application terminates.
  • when the producer thread terminates, there should be no unstarted task on the queue of the thread pool.

To give more context, I currently just submit all tasks at once and cancel all the futures returned by ExecutorService.submit after the max build time is expired. I ignore all resulting CancellationExceptions since they are expected. The problem is that the behaviour of Future.cancel(false) is odd and inadequate to my use-case:

  • it prevents any unstarted task to run (good)
  • it does not interrupt currently running tasks and let them run to completion (good)
  • however, it ignores any exception thrown by the currently running tasks and instead throws a CancellationException for which Exception.getCause() is null. Therefore, I can't distinguish a task which has been canceled before running from a task which has continued running after the max build time and failed with an exception ! That's unfortunate, because in this case I would like to propagate the exception and report it to some error handling mechanism.

I looked into the different blocking queues Java has to offer and found this: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/SynchronousQueue.html. That seemed ideal at first, but then looking at https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html, it does not seem to play with ThreadPoolExecutor in the way I want it to:

Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.

What would be ideal is that the consumer (= the pool) blocks on SynchronousQueue.poll and the producer (= task producer thread) blocks on SynchronousQueue.put.

Any idea how I can implement the scenario I described without writing any complex scheduling logic (what ThreadPoolExecutor should enclose for me) ?


Solution

  • I found another option than the one proposed by @Carlitos Way. It consists in directly adding tasks on the queue using BlockingQueue.offer. The only reason I did not manage to make it work at first and I had to post this question is that I did not know that the default behaviour of a ThreadPoolExecutor is to start without any thread. The threads will be created lazily using a thread factory and may be deleted/repopulated depending on the core and max sizes of the pool and the number of tasks being submitted concurrently.

    Since the thread creation was lazy, my attempts to block on the call to offer failed because SynchronousQueue.offer immediately exits if nobody is waiting to get an element from the queue. Conversely, SynchronousQueue.put blocks until someone asks to take an item from the queue, which will never happen if the thread pool is empty.

    Therefore, the workaround is to force the thread pool to create the core threads eagerly using ThreadPoolExecutor.prestartAllCoreThreads. My problem then becomes fairly trivial. I made a simplified version of my real use-case:

    import java.util.Random;
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.atomic.AtomicLong;
    
    import static java.util.concurrent.TimeUnit.MILLISECONDS;
    import static java.util.concurrent.TimeUnit.SECONDS;
    
    public class SimplifiedBuildScheduler {
        private static final int MAX_POOL_SIZE = 10;
    
        private static final Random random = new Random();
        private static final AtomicLong nextTaskId = new AtomicLong(0);
    
        public static void main(String[] args) throws InterruptedException {
            SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
    
            // this is a soft requirement in my system, not a real-time guarantee. See the complete semantics in my question.
            long maxBuildTimeInMillis = 50;
            // this timeout must be small compared to maxBuildTimeInMillis in order to accurately match the maximum build time
            long taskSubmissionTimeoutInMillis = 1;
    
            ThreadPoolExecutor pool = new ThreadPoolExecutor(MAX_POOL_SIZE, MAX_POOL_SIZE, 0, SECONDS, queue);
            pool.prestartAllCoreThreads();
    
            Runnable nextTask = makeTask(maxBuildTimeInMillis);
    
            long millisAtStart = System.currentTimeMillis();
            while (maxBuildTimeInMillis > System.currentTimeMillis() - millisAtStart) {
                boolean submitted = queue.offer(nextTask, taskSubmissionTimeoutInMillis, MILLISECONDS);
                if (submitted) {
                    nextTask = makeTask(maxBuildTimeInMillis);
                } else {
                    System.out.println("Task " + nextTaskId.get() + " was not submitted. " + "It will be rescheduled unless " +
                            "the max build time has expired");
                }
            }
    
            System.out.println("Max build time has expired. Stop submitting new tasks and running existing tasks to completion");
    
            pool.shutdown();
            pool.awaitTermination(9999999, SECONDS);
        }
    
        private static Runnable makeTask(long maxBuildTimeInMillis) {
            long sleepTimeInMillis = randomSleepTime(maxBuildTimeInMillis);
            long taskId = nextTaskId.getAndIncrement();
            return () -> {
                try {
                    System.out.println("Task " + taskId + " sleeping for " + sleepTimeInMillis + " ms");
                    Thread.sleep(sleepTimeInMillis);
                    System.out.println("Task " + taskId + " completed !");
                } catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            };
        }
    
        private static int randomSleepTime(long maxBuildTimeInMillis) {
            // voluntarily make it possible that a task finishes after the max build time is expired
            return 1 + random.nextInt(2 * Math.toIntExact(maxBuildTimeInMillis));
        }
    }
    

    An example of output is the following:

    Task 1 was not submitted. It will be rescheduled unless the max build time has expired
    Task 0 sleeping for 23 ms
    Task 1 sleeping for 26 ms
    Task 2 sleeping for 6 ms
    Task 3 sleeping for 9 ms
    Task 4 sleeping for 75 ms
    Task 5 sleeping for 35 ms
    Task 6 sleeping for 81 ms
    Task 8 was not submitted. It will be rescheduled unless the max build time has expired
    Task 8 was not submitted. It will be rescheduled unless the max build time has expired
    Task 7 sleeping for 86 ms
    Task 8 sleeping for 47 ms
    Task 9 sleeping for 40 ms
    Task 11 was not submitted. It will be rescheduled unless the max build time has expired
    Task 2 completed !
    Task 10 sleeping for 76 ms
    Task 12 was not submitted. It will be rescheduled unless the max build time has expired
    Task 3 completed !
    Task 11 sleeping for 31 ms
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 0 completed !
    Task 12 sleeping for 7 ms
    Task 14 was not submitted. It will be rescheduled unless the max build time has expired
    Task 14 was not submitted. It will be rescheduled unless the max build time has expired
    Task 1 completed !
    Task 13 sleeping for 40 ms
    Task 15 was not submitted. It will be rescheduled unless the max build time has expired
    Task 12 completed !
    Task 14 sleeping for 93 ms
    Task 16 was not submitted. It will be rescheduled unless the max build time has expired
    Task 16 was not submitted. It will be rescheduled unless the max build time has expired
    Task 16 was not submitted. It will be rescheduled unless the max build time has expired
    Task 5 completed !
    Task 15 sleeping for 20 ms
    Task 17 was not submitted. It will be rescheduled unless the max build time has expired
    Task 17 was not submitted. It will be rescheduled unless the max build time has expired
    Task 11 completed !
    Task 16 sleeping for 27 ms
    Task 18 was not submitted. It will be rescheduled unless the max build time has expired
    Task 18 was not submitted. It will be rescheduled unless the max build time has expired
    Task 9 completed !
    Task 17 sleeping for 95 ms
    Task 19 was not submitted. It will be rescheduled unless the max build time has expired
    Max build time has expired. Stop submitting new tasks and running existing tasks to completion
    Task 8 completed !
    Task 15 completed !
    Task 13 completed !
    Task 16 completed !
    Task 4 completed !
    Task 6 completed !
    Task 10 completed !
    Task 7 completed !
    Task 14 completed !
    Task 17 completed !
    

    You'll notice, for example, that task 19 was not rescheduled because the max build time expired before the scheduler can attempt to offer it to the queue a second time. You can also see than all the ongoing tasks that started before the max build time expired do run to completion.

    Note: As noted in my comments in the code, the max build time is a soft requirement, which means that it might not be met exactly, and my solution indeed allows for a task to be submitted even after the max build time is expired. This can happen if the call to offer starts just before the max build time expires and finishes after. To reduce the odds of it happening, it is important that the timeout used in the call to offer is much smaller than the max build time. In the real system, the thread pool is usually busy with no idle thread, therefore the probability of this race condition to occur is extremely small, and it has no bad consequence on the system when it does happen, since the max build time is a best effort attempt to meet an overall running time, not an exact and rigid constraint.