Search code examples
javacompletable-future

How to use join for a stream of CompletableFuture


Supposedly, I have the following method

public static Stream<CompletableFuture<String>> findPricesStream(String product) {}

This method will look for the cheapest price given a product and it returns a stream of CompletableFuture.

Now I want to react to the value in the stream as soon as it is available. To do that, I adopt the method thenAccept and the implementation could be as following

    1. public static void reactToEarliestResultWithRaw() {
    2.     long start = System.nanoTime();
    3.     CompletableFuture[] priceFuture = findPricesStream(WHATEVER_PRODUCT_NAME)
    4.             .map(completableFuture -> completableFuture.thenAccept(
    5.                     s -> System.out.println(s + " (done in " + (System.nanoTime() - start) / 1_000_000 + " msecs)")))
    6.             .toArray(CompletableFuture[]::new);
    7.     CompletableFuture.allOf(priceFuture).join();
    8.     System.out.println("All shops have now responded in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
    9. }

With this implementation, I got the desired output

LetsSaveBig price is 151.227 (done in 5476 msecs)
BuyItAll price is 211.66 (done in 5747 msecs)
MyFavoriteShop price is 206.30200000000002 (done in 6968 msecs)
BestPrice price is 131.917 (done in 8110 msecs)
All shops have now responded in 8110 msecs

Now I would like to take a further step to make the code more readable. I chained another map that is responsible for joining all of CompletableFuture

    1. public static void reactToEarliestResultWithoutRaw() {
    2.     long start = System.nanoTime();
    3.     List<Void> completeFutures = findPricesStream(WHATEVER_PRODUCT_NAME)
    4.             .map(completableFuture -> completableFuture.thenAccept(
    5.                     s -> System.out.println(s + " (done in " + (System.nanoTime() - start) / 1_000_000 + " msecs)")))
    6.             .map(CompletableFuture::join)
    7.             .toList();
    8.     int size = completeFutures.size();
    9.     if (isComplete(size)) {
    10.         System.out.println("All shops have now responded in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
    11. }
    12.
    13. private static boolean isComplete(int size) {
    14.     return size == shops.size();
    15. }    

I got the output

BestPrice price is 123.17400000000002 (done in 2060 msecs)
LetsSaveBig price is 109.67200000000001 (done in 6025 msecs)
MyFavoriteShop price is 131.21099999999998 (done in 13860 msecs)
BuyItAll price is 164.392 (done in 18434 msecs)
All shops have now responded in 18434 msecs

The result makes me surprised! I expect the elapsed time for both should be somehow the same, but they are a huge difference.

Do I misunderstand the way of using join here?

Reference

The implementation comes from the book Modern Java in Action: Lambdas, streams, functional and reactive programming 2nd Edition and I have modified it a bit for the experiment.


Solution

  • The "surprising" results are due to how findPricesStream is implemented: it returns shops.stream().map(shop -> CompletableFuture.supplyAsync(...). The CompletableFuture is not constructed until a terminal operation is applied to the returned stream. This is done in your own method, after you call .toList().

    The terminal operation toList() does this:

    1. For the first shop, it constructs a CompletableFuture, which starts running.
    2. The CompletableFuture is joined, i.e. the main thread waits until it is finished.
    3. Then the next CompletableFuture is constructed for the next shop, and so on.

    So the prices are calculated sequentially. To make the calculations run in parallel, create the list first (so that all futures are started) and then join them:

    3.     List<CompletableFuture<Void>> futures = findPricesStream(WHATEVER_PRODUCT_NAME)
    4.             .map(completableFuture -> completableFuture.thenAccept(
    5.                     s -> System.out.println(s + " (done in " + (System.nanoTime() - start) / 1_000_000 + " msecs)")))
    6.             .toList();
    7.     List<Void> results = futures.stream()
               .map(CompletableFuture::join)
               .toList();