Search code examples
javamultithreadingconcurrency

Is there a Java Future implementation that will execute in caller thread if pool is busy?


We use ExecutorService to make various REST calls to cloud storage in parallel, and we are concerned that the pool can become busy with blocked calls that have run amok, and then block newer calls for a long time (cloud timeouts can be several minutes after going through all of the retries). Of course we can always add more and more threads, but one solution I've implemented in C++ is to have Future.get() run in the caller thread if the task has not yet started. Java almost does this with Executors.newWorkStealingQueue(), but the run-in-caller logic is only enabled if the caller is itself a pool thread:

class ForkJoinTask {
  private int awaitDone(...) {
      ...
      if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
        // execute in caller thread

This is how work-stealing keeps recursive algos from running out of threads. But if the caller is a normal thread this logic isn't triggered. ForkJoinPool is no different, because Executors.newWorkStealingQueue() is just a wrapper around that. Is there anything in Java supports what I'm after?

PS: We can't use JDK 21 with its virtual threads yet :-(

UPDATE: A little more explanation about the use case. We find that in order to answer certain questions about cloud storage like "should this path be treated as a directory or a file", we have to ask several questions:

  • Does a blob exist with that name?
  • Does a blob exist with that name followed by a / ?
  • Do other blobs exist that have that path as a prefix?

The executor lets us ask these questions in parallel and reduce the latency. So we have a shared executor for everyone to ask these questions of the cloud storage endpoints. BUT... occasionally one of the endpoints will freakout and not answer questions until it either recovers or times out, which can take several minutes. When this happens, we don't want the delayed requests to clog the executor for everyone else and make every request wait. The "caller executes when busy" pattern gives us a safety valve, everything runs a little slower for a few minutes but not a lot slower.


Solution

  • If you use standard ThreadPoolExecutor, "CallerRuns" policy does exactly what you want:

    https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

    From Javadoc:

    A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded.

    Here is an explanation about how rejection happens:

    Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:

    • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
    • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
    • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be
      rejected.

    UPDATE FROM OP: use SynchronousQueue to avoid queueing and it works like a charm

    exec = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new SynchronousQueue<>());
    exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());