Here's the scenario: it might randomly generate some data, and if it does, then the data needs to be recursively retrieved, finally I need get all the generated data.
interface DataProvider {
List<String> randomData(String url);
}
public static void main(String[] args) {
List<String> strings = fetch(Executors.newFixedThreadPool(4), new DataProvider() {
final Random random = new Random();
@Override
public List<String> randomData(String url) {
if (random.nextBoolean()) {
System.out.println("provide some data");
return List.of(UUID.randomUUID().toString());
}
return null;
}
}, List.of("a", "b", "c"));
System.out.println("results are: ");
System.out.println(strings);
}
private static List<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return new ArrayList<>();
List<CompletableFuture<List<String>>> collect =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.collect(Collectors.toList());
List<CompletableFuture<List<String>>> list = new ArrayList<>();
collect.forEach(item -> {
CompletableFuture<List<String>> listCompletableFuture = item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es);
list.add(listCompletableFuture);
});
return list.stream().flatMap(item -> item.join().stream()).collect(Collectors.toList());
}
Sometimes the program freezes, and sometimes it prints an empty collection. (provide some data
is printed).
Where did I go wrong? I'm not at all familiar with CompletableFuture
, so maybe the whole recursive invocation is wrong. (Or the code could be simpler, because CompletableFuture
has a lot of methods).
There are indeed some issues with your code:
The result of fetch()
will be either:
items
is emptyfetch(randomData(item))
on each item in the items
list
Since there is no recursion leaf that returns a non-empty list, and parent calls don’t add anything to the list either, it can only return an empty result.Maybe you wanted to include the randomData()
in the result as well?
CompletableFuture.join()
inside a Fixed Thread PoolYou are using Executors.newFixedThreadPool(4)
. As the name indicates, the number of threads will be fixed, so when all threads are exhausted, new tasks are queued.
The problem is that you are using join()
to wait for some of those new tasks. So on one hand you are blocking one of the threads, and on the other hand you are waiting for something you just put on the queue.
As it is recursive, if it reaches a depth of 4, it will deadlock.
The simplest way to prevent this would be to use a ForkJoinPool
. This kind of pool will spawn new threads when one of its threads gets blocked.
As a side note, a ForkJoinPool
uses daemon threads, so they don’t prevent the JVM from terminating. If you want to use a Fixed Thread Pool, you have to call shutdown()
on it to make sure it allows termination – or configure it to use daemon threads.
Your code is building a lot of intermediate collections and blocking on a lot of join()
calls. CompletableFuture
is designed to chain related stages of computation thanks to its implementation of CompletionStage
, and not just merely using it as a Future
that can be completed.
Without changing your current logic, first you could make fetch()
return a Stream
, which removes a lot of collect()
/stream()
calls:
private static Stream<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return Stream.empty();
List<CompletableFuture<Stream<String>>> list =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.map(item -> item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es))
.collect(Collectors.toList());
return list.stream().flatMap(CompletableFuture::join);
}
To reach this, here is what was changed:
forEach()
/ list.add()
with just a collect.stream(…).map(…).collect(toList())
collect
variable.collect(toList()).stream()
as it is redundantStream<String>
, the main changes being:
** change the type of list
to List<CompletableFuture<Stream<String>>>
(as the inner Stream
comes from the second map(… -> fetch())
call
** remove the collect()
and stream()
from the return statementNote that you need to keep the intermediate list
to make sure that all tasks are supplied before the first call to CompletableFuture::join
.
This is better, but the call is still synchronous and requires a ForkJoinPool
to work. Since the logic inside fetch()
is mostly asynchronous, why not make the whole method asynchronous?
This mainly requires to change the return type to CompletableFuture<Stream<String>>
and use allOf()
to create a future that will complete when all intermediate futures are completed:
private static CompletableFuture<Stream<String>> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return CompletableFuture.completedFuture(Stream.empty());
List<CompletableFuture<Stream<String>>> list =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.map(item -> item.thenComposeAsync(strings -> fetch(es, dataProvider, strings), es))
.collect(Collectors.toList());
return CompletableFuture.allOf(list.toArray(new CompletableFuture[0]))
.thenApply(dummy -> list.stream().flatMap(CompletableFuture::join));
}
This is now 100% asynchronous because join()
will only be called on futures that are known to be completed (thanks to allOf()
), and thus will never block.
It does not need a ForkJoinPool
anymore. In fact, it can even run on a newFixedThreadPool(1)
! Well, as long as you call shutdown()
on it at the end…