Search code examples
javamultithreadingasynchronousguavacompletable-future

How to efficiently use CompletableFuture to map async task per input


I want to return map consisting mapping of all keys to value being API response to those keys. I am using CompletableFuture and Guava for this. Below is my attempt. Is there any other standard way to achieve the same with Java 8 and threading APIs?

Map being id -> apiResponse(id).

    
    public static List<String> returnAPIResponse(Integer key) {
        return Lists.newArrayList(key.toString() + " Test");
    }

    public static void main(String[] args) {
        List<Integer> keys = Lists.newArrayList(1, 2, 3, 4);

        List<CompletableFuture<SimpleEntry<Integer, List<String>>>> futures = keys
            .stream()
            .map(key -> CompletableFuture.supplyAsync(
                () -> new AbstractMap.SimpleEntry<>(key, returnAPIResponse(key))))
            .collect(Collectors.toList());

        System.out.println(
            futures.parallelStream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()));

    }


Solution

  • There is an interesting behavior here that I will try my best to explain. Let's start simple, let's forget about CompletableFuture for a second and simply do this with a plain parallelStream, with a minor debugging step added:

    List<Integer> keys = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
    
    Map<Integer, List<String>> result =
        keys.parallelStream()
            .map(x -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    
    System.out.println("parallelism : " + pool.getParallelism() + " current : " + pool.getPoolSize());
    

    On my machine, this prints:

    parallelism : 11 current : 11
    

    I assume you already know that actions of parallelStream are executes in the common ForkJoinPool. It is probably also obvious what that output means: 11 threads were available and all of them were used.

    I'll slightly modify your example now:

    List<Integer> keys = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
    
    ForkJoinPool pool = ForkJoinPool.commonPool();
    ExecutorService supplyPool = Executors.newFixedThreadPool(2);
    
    Map<Integer, List<String>> result =
    keys.parallelStream()
        .map(x -> CompletableFuture.supplyAsync(
                 () -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)),
                 supplyPool
        ))
        .map(CompletableFuture::join)
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    
     System.out.println("parallelism : " + pool.getParallelism() + " current : " + pool.getPoolSize());
    

    It's actually just one important change, I'll let your supplyAsync run in its own thread pool; the rest is the same. Running this, reveals:

    parallelism : 11 current : 16
    

    Surprise. More threads were created then what we wanted? Well, the documentation of getPoolSize says that:

    Returns the number of worker threads that have started but not yet terminated. The result returned by this method may differ from getParallelism when threads are created to maintain parallelism when others are cooperatively blocked.

    The blocking in your case happens via map(CompletableFuture::join). You have effectively blocked a worker thread from ForkJoinPool and it compensates that by spinning another one.


    If you do not want to get into such a surprise:

    List<CompletableFuture<AbstractMap.SimpleEntry<Integer, List<String>>>> list =
    keys.stream()
        .map(x -> CompletableFuture.supplyAsync(
             () -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)),
             supplyPool
         ))
        .collect(Collectors.toList());
    
    CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();
    
    Map<Integer, List<String>> result =
    list.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    

    Because there is no join on the worker threads of ForJoinPool, you can drop parallelStream. Then I still block to get the result via:

    CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();
    

    but there will be no compensating threads generated. And because CompletableFuture.allOf returns a CompletableFuture<Void>, I need to stream again to get the results.

    Don't let that .map(CompletableFuture::join) in the last stream operation fool you, there is no blocking because of the previous CompletableFuture::allOf, which already blocked and waited for all tasks to finish.