Search code examples
javamultithreadingconcurrencyjava.util.concurrentcompletable-future

Not able to get the result of completable future into the response object


I am calling four APIs using the CompletableFuture interface of java.util.concurrent with Java8. I want to do multiple rest calls, combine the results and return a JSON.

I have tried with the Future, it was working for me. Then I wanted to try CompletableFuture as well.

public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {

    Map<String, Map<String, Object>> response = new HashMap<>();
    ExecutorService executor = Executors.newFixedThreadPool(6);
    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("mani");
        return aClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("gani");
        return bClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("priya");
        return cClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("ravi");
        return dClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    return response;
}

private void putIntoResponse(Map<String, Map<String, Object>> response, List<Map<String, Object>> s) {
    if(s.size() > 0) {
        for (Map<String, Object> maps : s) {
            if (maps != null && maps.containsKey("abcd")) {
                String abcd = maps.get("abcd").toString();
                if(!response.containsKey(abcd))
                    response.put(maps.get("abcd").toString(), maps);
                else {
                    for (Map.Entry<String, Object> entry: maps.entrySet()) {
                        response.get(abcd).put(entry.getKey(), entry.getValue());
                    }
                }
            }
        }
    }
}

I am calling four Apis. I am getting a List of Hashmap as a response for each API. Now, the result of each API needs to be clubbed into one response that is, Maps of maps. So, I want, as I get the response from an API, I put that result into the hashmap.

But what I am getting is an empty response. Here completable future calls the API but the server does not wait for the call response and returns. How to let the server wait? Please suggest a method so that I can execute this use-case with completableFuture. Also, suggest some cleaner way to do it.

Response getting from the apis:

{ 
   {
    "abcd": 1,
    "cde": 2
   },
   { 
    "abcd": 2,
     "cde": 3
   }
}

Parse the above response to:

{
   "1" : {
    "abcd": 1,
    "cde": 2
   },
   "2":{ 
    "abcd": 2,
     "cde": 3
   }

}

Solution

  • I think that your problem lies in the fact that the CompletableFuture.supplyAsync() is not blocked so your code instantly moves forward, instead of waiting for the async operation to be applied.

    The code is executed in an asynchronous manner, which in this case means that, you tell the operations inside of the CompletableFuture.supplyAsync() to be performed, and you move further. The thenApply() part is invoked only, when the code inside supplyAsync finishes execution, which most probably happens after you have already returned the response.

    If you want to wait for all CompletableFutures to finish execution before returning the response, then you should use the CompletableFuture.join() method.

    First try to refactor your code, to get rid of thenApply() part, so that your ~CompletableFutures` result in the partial responses.

    Then assign all CompletableFutures to some variables (myFirstFuture, mySecondFuture etc in my example).

    After that using a Stream apply a join() method on all the CompletableFutures and apply your putInResponse method for each of the results.

    For example:

    public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {
    
        Map<String, Map<String, Object>> response = new HashMap<>();
        ExecutorService executor = Executors.newFixedThreadPool(6);
        CompletableFuture<List<Map<String, Object>>> myFuture1 = CompletableFuture.supplyAsync(() -> {
            List<Template> keys = new ArrayList<>();
            keys.add("mani");
            return aClient.transform(keys, requestObj);
        }, executor);
    
        CompletableFuture<List<Map<String, Object>>> myFuture2 = CompletableFuture.supplyAsync(() -> {
            List<Template> keys = new ArrayList<>();
            keys.add("gani");
            return bClient.transform(keys, requestObj);
        }, executor);
    
        CompletableFuture<List<Map<String, Object>>> myFuture3 = CompletableFuture.supplyAsync(() -> {
            List<Template> keys = new ArrayList<>();
            keys.add("priya");
            return cClient.transform(keys, requestObj);
        }, executor);
    
        CompletableFuture<List<Map<String, Object>>> myFuture4 = CompletableFuture.supplyAsync(() -> {
            List<Template> keys = new ArrayList<>();
            keys.add("ravi");
            return dClient.transform(keys, requestObj);
        }, executor);
    
        Stream.of(myFuture1, myFuture2, myFuture3, myFuture4)
            .map(CompletableFuture::join)
            .filter(Objects::nonNull)
            .forEachOrdered(s -> putIntoResponse(response, s));
    
    
        return response;
    }