I am learning concurrency through the CompletableFuture API. Let's say I have two tasks: one takes 250ms and another takes 2500ms. In the following code:
Supplier<List<Long>> supplyIds = () -> {
sleep(200);
return(Arrays.asList(1L, 2L, 3L));
};
Function<List<Long>, CompletableFuture<List<User>>> fetchUsers1 = idList -> {
sleep(250);
System.out.println("User2"+ Thread.currentThread().getName());
Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
return(CompletableFuture.supplyAsync(userSupplier));
};
Function<List<Long>, CompletableFuture<List<User>>> fetchUsers2 = idList -> {
sleep(2500);
System.out.println("User2"+ Thread.currentThread().getName());
Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
return(CompletableFuture.supplyAsync(userSupplier));
};
Consumer<List<User>> displayer = users -> {
users.forEach(System.out::println);
};
CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIds);
CompletableFuture<List<User>> users1 = completableFuture.thenCompose(fetchUsers1);
CompletableFuture<List<User>> users2 = completableFuture.thenCompose(fetchUsers2);
users1.thenRun(()-> System.out.println("User 1"));
users2.thenRun(()-> System.out.println("User 2"));
users1.acceptEither(users2, displayer);
sleep(6000);
I get the following result:
User2ForkJoinPool.commonPool-worker-1
User 2
1
2
3
User2ForkJoinPool.commonPool-worker-1
User 1
I understand that the code is running synchronously since the same common fork join pool thread is being used and we don't specify the thread. I am confused as to why the fetchUsers2
task is being executed first and then fetchUsers1
task (this seems to be consistent with each run). I assumed that since thenCompose
is called on fetchUsers1
first in the code, it would be 'queued up' first.
Nothing in the documentation says that the order of invocation matters for thenCompose
.
Since you define two independent stages, both only depending on completableFuture
, there is no defined order between users1
and user2
and the resulting order is just implementation dependent.
You may reproducibly get a particular order in one environment, but a different order in a different environment. Even in your environment, it’s possible to get a different order in some runs. If the initiating thread loses the CPU after calling supplyAsync(supplyIds)
for 200 milliseconds, it might execute the action specified with thenCompose(fetchUsers1)
immediately at the invocation, before the invocation of thenCompose(fetchUsers2)
.
When the order between two actions matters, you must model a dependency between them.
Note that likewise the code
users1.thenRun(()-> System.out.println("User 1"));
users2.thenRun(()-> System.out.println("User 2"));
users1.acceptEither(users2, displayer);
defines entirely independent actions. Since acceptEither
is applied to users1
and users2
, rather than the completion stages returned by the thenRun
calls, it is not dependent on the completion of the print statements. These three actions may be performed in any order.