For example -
CompletableFuture.supplyAsync(stage1).acceptEither(stage2, consumer );
stage1 and stage2 are supposed to run in parallel in above code as it is using acceptEither
If I change to acceptEitherAsync
, i see I can get more control over which thread (thread from ForkJoin pool or thread from custom executor pool) runs stage2.
But on running below code (jdoddle link) I found that stage2 is not running in
main` thread but in a ForkJoin pool. How is it possible.
From Java Doc -
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
import java.util.concurrent.CompletableFuture;
public class MyClass {
public static void main(String args[]) {
System.out.println("hello world, CompletableFuturesRightHere");
CompletableFuture.supplyAsync(() -> {
System.out.println("log-sA: " + Thread.currentThread().getName());
System.out.println("log-sA post: " + Thread.currentThread().getName());
return 40;
}).acceptEither(CompletableFuture.supplyAsync(() -> {
System.out.println("log-aE: " + Thread.currentThread().getName());
System.out.println("log-aE post: " + Thread.currentThread().getName());
return 80;
}), a -> { System.out.println(a); } );
}
}
Output:
hello world, CompletableFuturesRightHere
log-aE: ForkJoinPool.commonPool-worker-5
log-sA: ForkJoinPool.commonPool-worker-3
log-aE post: ForkJoinPool.commonPool-worker-5
log-sA post: ForkJoinPool.commonPool-worker-3
40
How can stage2 i.e log-aE run in worker-5, it should run in main or worker-3. But Since stage1 and stage2 are supposed to run in parallel and worker-3 is already running stage1, the only thread left for stage2 is main
.
How could stage2 run in worker-5.
This might be clearer if you don't do everything inline. For example:
// You used 'supplyAsync', which means the 'Supplier' will be executed
// using the common ForkJoinPool (any available thread from that pool)
CompletableFuture<Integer> stage1 = CompletableFuture.supplyAsync(() -> {
System.out.println("Stage 1: " + Thread.currentThread().getName());
return 40;
});
// You used 'supplyAsync' *again*, which means the 'Supplier' will be executed
// using the common ForkJoinPool (any available thread from that pool)
CompletableFuture<Integer> stage2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Stage 2: " + Thread.currentThread().getName());
return 80;
});
/*
* Both 'stage1' and 'stage2' have been set to execute using the common
* ForkJoinPool. The use of 'acceptEither', or even 'acceptEitherAsync',
* has no bearing on that. Because you used 'acceptEither', the
* **CONSUMER** will be executed by the same thread which finished 'stage1'
* or the same thread that finished 'stage2', whichever happens first,
* or the caller thread (typically if 'stage1' and/or 'stage2' are already
* complete by the time 'acceptEither' is invoked).
*/
stage1.acceptEither(stage2, result -> {
System.out.println("Stage acceptEither: " + Thread.currentThread().getName());
System.out.println("Result = " + result);
});
Note: supplyAsync
has an overload that accepts an Executor
, allowing you to specify a thread pool other than the common ForkJoinPool
. The same is true for all the xxxAsync
methods.