I have overridden the execute
method for java.util.concurrent.Executor
in ThreadPoolExecutor
implementation. The new implementation just decorates the runnable and then calls the original execute. The issue I'm having is that if I have two such executors, then following:
supplyAsync(() -> foo(), firstExecutor).thenApplyAsync(firstResult -> bar(), secondExecutor)
translates to two execute
calls. Usually they are executed by main
and firstExecutor
, but sometimes it's main
two times.
Does it depend on how long it takes to complete the Suppplier in supplyAsync?
Here's a minimal reproducible example (10k repeats, for me it fails about 3 times java.lang.AssertionError: Unexpected second decorator: main
):
package com.foo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class DecorationTest {
record WhoCalled(String decorator, String runnable) {}
static class DecoratedExecutor extends ThreadPoolExecutor{
private final List<WhoCalled> callers;
public DecoratedExecutor(List<WhoCalled> callers, String threadName) {
super(1, 1, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), runnable -> new Thread(runnable, threadName));
this.callers = callers;
}
@Override
public void execute(final Runnable command) {
String decoratingThread = Thread.currentThread().getName();
Runnable decorated = () -> {
String runningThread = Thread.currentThread().getName();
callers.add(new WhoCalled(decoratingThread, runningThread));
command.run();
};
super.execute(decorated);
}
}
List<WhoCalled> callers;
ExecutorService firstExecutor;
ExecutorService secondExecutor;
@BeforeEach
void beforeEach() {
callers = new ArrayList<>();
firstExecutor = new DecoratedExecutor(callers, "firstExecutor");
secondExecutor = new DecoratedExecutor(callers, "secondExecutor");
}
@AfterEach
void afterEach() {
firstExecutor.shutdown();
secondExecutor.shutdown();
}
@RepeatedTest(10_000)
void testWhoCalled() throws Exception {
Integer result = CompletableFuture.supplyAsync(() -> 1, firstExecutor)
.thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
.get();
assert result == 1;
WhoCalled firstCallers = callers.get(0);
assert firstCallers.decorator().equals("main");
assert firstCallers.runnable().equals("firstExecutor");
WhoCalled secondCallers = callers.get(1);
assert secondCallers.decorator().equals("firstExecutor") : "Unexpected second decorator: " + secondCallers.decorator;
assert secondCallers.runnable().equals("secondExecutor");
}
}
Does it depend on how long it takes to complete the
Supplier
insupplyAsync
?
This depends on whether or not the supplyAsync
part has been completed before the thenApplyAsync
call.
Let's split the CompletableFuture
chain in the testWhoCalled
test for explanation. Instead of:
Integer result = CompletableFuture.supplyAsync(() -> 1, firstExecutor)
.thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
.get();
do:
CompletableFuture<Integer> firstFuture = CompletableFuture.supplyAsync(() -> 1, firstExecutor);
Integer result = firstFuture
.thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
.get();
The test still fails for me sometimes. There are no changes in logic, but it will allow me easier to explain.
When I create the firstFuture
using the supplyAsync
method, the firstExecutor
can already execute the lamba's (Supplier
's) body.
In other words, computation can be already running.
This is different from other frameworks. For example, in Project Reactor nothing happens until you subscribe
.
So the computation can be already running after we return from the supplyAsync
.
Let's understand what happens next.
There are two possibilities:
firstExecutor
has executed the () -> 1
lambda. And the CompletableFuture
has the result() -> 1
lambda is executing or will be executed later. We don't have result.Now we do:
firstFuture.thenApplyAsync(...)
Who will submit the second lambda into the secondExecutor
?
It will do either main
thread or a thread from the firstExecutor
:
main
thread will submit the task into the secondExecutor
. Because no one can do it here except the main
thread: the task is already done, the firstExecutor
can not submit the task to the secondExecutor
firstExecutor
submits the task into the secondExecutor
If I add a sleep
in the test like:
CompletableFuture<Integer> firstFuture = CompletableFuture.supplyAsync(() -> 1, firstExecutor);
Thread.sleep(1);
Integer result = firstFuture
.thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
.get();
I have 9986 failures.
The sleep
call increases the probability that the firstExecutor
has completed the task before we add a new callback in thenApplyAsync
.
See also "Asynchronous API with CompletableFuture: Performance Tips and Tricks" talk