Supposedly, I have the following method
public static Stream<CompletableFuture<String>> findPricesStream(String product) {}
This method will look for the cheapest price given a product
and it returns a stream of CompletableFuture.
Now I want to react to the value in the stream as soon as it is available. To do that,
I adopt the method thenAccept
and the implementation could be as following
1. public static void reactToEarliestResultWithRaw() {
2. long start = System.nanoTime();
3. CompletableFuture[] priceFuture = findPricesStream(WHATEVER_PRODUCT_NAME)
4. .map(completableFuture -> completableFuture.thenAccept(
5. s -> System.out.println(s + " (done in " + (System.nanoTime() - start) / 1_000_000 + " msecs)")))
6. .toArray(CompletableFuture[]::new);
7. CompletableFuture.allOf(priceFuture).join();
8. System.out.println("All shops have now responded in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
9. }
With this implementation, I got the desired output
LetsSaveBig price is 151.227 (done in 5476 msecs)
BuyItAll price is 211.66 (done in 5747 msecs)
MyFavoriteShop price is 206.30200000000002 (done in 6968 msecs)
BestPrice price is 131.917 (done in 8110 msecs)
All shops have now responded in 8110 msecs
Now I would like to take a further step to make the code more readable. I chained another map that is responsible for joining all of CompletableFuture
1. public static void reactToEarliestResultWithoutRaw() {
2. long start = System.nanoTime();
3. List<Void> completeFutures = findPricesStream(WHATEVER_PRODUCT_NAME)
4. .map(completableFuture -> completableFuture.thenAccept(
5. s -> System.out.println(s + " (done in " + (System.nanoTime() - start) / 1_000_000 + " msecs)")))
6. .map(CompletableFuture::join)
7. .toList();
8. int size = completeFutures.size();
9. if (isComplete(size)) {
10. System.out.println("All shops have now responded in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
11. }
12.
13. private static boolean isComplete(int size) {
14. return size == shops.size();
15. }
I got the output
BestPrice price is 123.17400000000002 (done in 2060 msecs)
LetsSaveBig price is 109.67200000000001 (done in 6025 msecs)
MyFavoriteShop price is 131.21099999999998 (done in 13860 msecs)
BuyItAll price is 164.392 (done in 18434 msecs)
All shops have now responded in 18434 msecs
The result makes me surprised! I expect the elapsed time for both should be somehow the same, but they are a huge difference.
Do I misunderstand the way of using join
here?
The implementation comes from the book Modern Java in Action: Lambdas, streams, functional and reactive programming 2nd Edition
and
I have modified it a bit for the experiment.
The "surprising" results are due to how findPricesStream
is implemented: it returns shops.stream().map(shop -> CompletableFuture.supplyAsync(...)
. The CompletableFuture is not constructed until a terminal operation is applied to the returned stream. This is done in your own method, after you call .toList()
.
The terminal operation toList()
does this:
shop
, it constructs a CompletableFuture
, which starts running.So the prices are calculated sequentially. To make the calculations run in parallel, create the list first (so that all futures are started) and then join them:
3. List<CompletableFuture<Void>> futures = findPricesStream(WHATEVER_PRODUCT_NAME)
4. .map(completableFuture -> completableFuture.thenAccept(
5. s -> System.out.println(s + " (done in " + (System.nanoTime() - start) / 1_000_000 + " msecs)")))
6. .toList();
7. List<Void> results = futures.stream()
.map(CompletableFuture::join)
.toList();