Search code examples
javamultithreadingconcurrencyjava.util.concurrentcompletable-future

Why does the 'execute' method of ExecutorService in applyAsync run on main thread sometimes


I have overridden the execute method for java.util.concurrent.Executor in ThreadPoolExecutor implementation. The new implementation just decorates the runnable and then calls the original execute. The issue I'm having is that if I have two such executors, then following:

supplyAsync(() -> foo(), firstExecutor).thenApplyAsync(firstResult -> bar(), secondExecutor)

translates to two execute calls. Usually they are executed by main and firstExecutor, but sometimes it's main two times.

Does it depend on how long it takes to complete the Suppplier in supplyAsync?

Here's a minimal reproducible example (10k repeats, for me it fails about 3 times java.lang.AssertionError: Unexpected second decorator: main):

package com.foo;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class DecorationTest {

    record WhoCalled(String decorator, String runnable) {}

    static class DecoratedExecutor extends ThreadPoolExecutor{

        private final List<WhoCalled> callers;

        public DecoratedExecutor(List<WhoCalled> callers, String threadName) {
            super(1, 1, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), runnable -> new Thread(runnable, threadName));
            this.callers = callers;
        }

        @Override
        public void execute(final Runnable command) {
            String decoratingThread = Thread.currentThread().getName();
            Runnable decorated = () -> {
                String runningThread = Thread.currentThread().getName();
                callers.add(new WhoCalled(decoratingThread, runningThread));
                command.run();
            };
            super.execute(decorated);
        }
    }

    List<WhoCalled> callers;
    ExecutorService firstExecutor;
    ExecutorService secondExecutor;

    @BeforeEach
    void beforeEach() {
        callers = new ArrayList<>();
        firstExecutor = new DecoratedExecutor(callers, "firstExecutor");
        secondExecutor = new DecoratedExecutor(callers, "secondExecutor");
    }

    @AfterEach
    void afterEach() {
        firstExecutor.shutdown();
        secondExecutor.shutdown();
    }


    @RepeatedTest(10_000)
    void testWhoCalled() throws Exception {
        Integer result = CompletableFuture.supplyAsync(() -> 1, firstExecutor)
                .thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
                .get();

        assert result == 1;

        WhoCalled firstCallers = callers.get(0);
        assert firstCallers.decorator().equals("main");
        assert firstCallers.runnable().equals("firstExecutor");

        WhoCalled secondCallers = callers.get(1);
        assert secondCallers.decorator().equals("firstExecutor") : "Unexpected second decorator: " + secondCallers.decorator;
        assert secondCallers.runnable().equals("secondExecutor");
    }
}

Solution

  • Does it depend on how long it takes to complete the Supplier in supplyAsync?

    This depends on whether or not the supplyAsync part has been completed before the thenApplyAsync call.

    Explanation

    Let's split the CompletableFuture chain in the testWhoCalled test for explanation. Instead of:

    Integer result = CompletableFuture.supplyAsync(() -> 1, firstExecutor)
             .thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
             .get();
    

    do:

    CompletableFuture<Integer> firstFuture = CompletableFuture.supplyAsync(() -> 1, firstExecutor);
    Integer result = firstFuture
             .thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
             .get();
    

    The test still fails for me sometimes. There are no changes in logic, but it will allow me easier to explain.

    When I create the firstFuture using the supplyAsync method, the firstExecutor can already execute the lamba's (Supplier's) body.

    In other words, computation can be already running. This is different from other frameworks. For example, in Project Reactor nothing happens until you subscribe.

    So the computation can be already running after we return from the supplyAsync. Let's understand what happens next. There are two possibilities:

    1. firstExecutor has executed the () -> 1 lambda. And the CompletableFuture has the result
    2. The () -> 1 lambda is executing or will be executed later. We don't have result.

    Now we do:

    firstFuture.thenApplyAsync(...)
    

    Who will submit the second lambda into the secondExecutor?

    It will do either main thread or a thread from the firstExecutor:

    • If we have the result, the main thread will submit the task into the secondExecutor. Because no one can do it here except the main thread: the task is already done, the firstExecutor can not submit the task to the secondExecutor
    • If we don't have the result a thread from the firstExecutor submits the task into the secondExecutor

    Experiment: add sleep in between

    If I add a sleep in the test like:

    CompletableFuture<Integer> firstFuture = CompletableFuture.supplyAsync(() -> 1, firstExecutor);
    Thread.sleep(1);
    Integer result = firstFuture
            .thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
            .get();
    

    I have 9986 failures. The sleep call increases the probability that the firstExecutor has completed the task before we add a new callback in thenApplyAsync.

    See also "Asynchronous API with CompletableFuture: Performance Tips and Tricks" talk