Search code examples
javaspringcompletable-future

How to correctly fan out a list in CompletableFutures


I'm trying to fan out from a method which returns a CompletableFuture<List> into another method for each list element, where the fan in turn also returns a CompletableFuture each. Afterwards i want to return a CompletableFuture.allOf from the Futures produced by the list.

In essence i have the following methods (assume they are in their own Service classes, i just combined them for brevity here):

@Async
public CompletableFuture<List<File>> getMatchingCsvFrom(Path directory, Pattern pattern) {
    ...some work here
}

@Async
public CompletableFuture<Boolean> processSingleCsv(File csvFile) {
    ...some work here
}

And i'm trying to call them like this:

public CompletableFuture<Void> processDirectory(Path directory) {
    CompletableFuture<List<File>> matchingCsvFrom = fileProcessingService.getMatchingCsvFrom(directory, PROCESS_PATTERN);

    List<CompletableFuture<Boolean>> processFutures = matchingCsvFrom.get().stream()
            .map(file -> processService.processProcessCsv(file))
            .collect(Collectors.toList());
    return CompletableFuture.allOf(processFutures.toArray(new CompletableFuture[0]));
}

The .get() is obviously a problem there, but i'm not able to resolve it using .thenApply(), .thenAccept(), or .thenCompose().

Unfortunately all other answers i found want to accomplish the opposite (going from List<CompletableFuture> to CompletableFuture<List>). Appreciate any suggestions!


Solution

  • public CompletableFuture<Void> processDirectory(Path directory) {
        CompletableFuture<List<File>> matchingCsvFrom = fileProcessingService.getMatchingCsvFrom(directory, PROCESS_PATTERN);
    
        return matchingCsvFrom.thenCompose( list -> {
            var processFutures = list.stream()
                .map(file -> processService.processProcessCsv(file))
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[0]);
            return CompletableFuture.allOf(processFutures);
        });
    }