My PC is four-cored (FYI)
CompletableFuture will use ForkJoinPool.commonPool()
as its official doc points out:
All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).
I debugged and found out the following code from CompletableFuture.supplyAsync(Supplier<U> supplier)
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Which means as parallelStream
always does using ForkJoinPool.commonPool()
, but here why it's quicker.
I tried to print them out and found out that only three threads when using CompletableFuture:
private static int concurrencyGet() {
List<CompletableFuture<Integer>> futureList = IntStream.rangeClosed(0, 10).boxed()
.map(i -> CompletableFuture.supplyAsync(() -> getNumber(i)))
.collect(Collectors.toList());
return futureList.stream().map(future -> future.join()).reduce(0, Integer::sum);
}
But parallelStream using four including the main thread.
My guess is that in CompletableFuture.supplyAsync()
, the ForkJoinPool.getCommonPoolParallelism()
is only three while main thread taking one of the four, since it's asynchronous.
But the parallelStream will use up all the four since its not asynchronous.
Is this correct? I wonder are there some official documentations for this issue?
Thanks for the help.
Following is how I understood it from Venkat Subramaniams talk on Parallel and Asynchronous Programming with Streams and CompletableFuture:
As CompleteableFuture
also utilizes ForkJoinPool.commonPool()
it may as well use the main thread, and it does under certain circumstances.
Given the following example
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> numberSupplier());
future.thenAccept(i -> System.out.println("f: " + i + " - " + Thread.currentThread()));
sleep(100); //wait for async operations to finish before exiting
}
private static Integer numberSupplier() {
Integer n = 2;
System.out.println("c: " + n + " - " + Thread.currentThread());
sleep(19);
return n;
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
you might get a console output like this:
c: 2 - Thread[ForkJoinPool.commonPool-worker-1,5,main]
f: 2 - Thread[ForkJoinPool.commonPool-worker-1,5,main]
Both, the supplyAsync(..)
as well as the thenAccept(..)
part are executed by a worker thread from the ForkJoinPool
.
However, if the Supplier<Integer>
given to supplyAsync(..)
is so fast, that it is finished when the thenAccept(..)
is invoked, then that second part might as well be executed in the main thread:
private static Integer numberSupplier() {
Integer n = 2;
//System.out.println("c: " + n + " - " + Thread.currentThread());
//sleep(19);
return n;
}
Output:
f: 2 - Thread[main,5,main]