After spending the day of learning about the java Concurrency API, I still dont quite get how could I create the following functionality with the help of CompletableFuture and ExecutorService classes:
When I get a request on my REST endpoint I need to:
I have the building blocks (methods like: getMatchingObjectsFromDB(callerPayload), getURLs(resultOfgetMachingObjects), sendHttpRequest(Url, methodType), etc...) written for these already, I just cant quite figure out how to tie step 1 and step 3 together. I would use CompletableFuture.supplyAsync()
for step 1, then I would need the CompletableFuture.thenComponse
method to start step 3, but it's not clear to me how parallelism can be done with this API. It is rather intuitive with ExecutorService executor = Executors.newWorkStealingPool();
though, which creates a thread pool based on how much processing power is available and the tasks can be submitted via the invokeAll()
method.
How can I use CompletableFuture
and ExecutorService
together? Or how can I guarantee parallel execution of a list of tasks with CompletableFuture
? Demonstrating code snippet would be much appreciated. Thanks.
You should use join()
to wait for all thread finish.
Create Map<String, Boolean> result
to store your request result.
In your controller:
public void yourControllerMethod() {
CompletableFuture.runAsync(() -> yourServiceMethod());
}
In your service:
// Execute your logic to get List<String> urls
List<CompletableFuture> futures = urls.stream().map(v ->
CompletableFuture.supplyAsync(url -> requestUrl(url))
.thenAcceptAsync(requestResult -> result.put(url, true or false))
).collect(toList()); // You have list of completeable future here
Then use .join()
to wait for all thread (Remember that your service are executed in its own thread already)
CompletableFuture.allOf(futures).join();
Then you can determine which one success/fail by accessing result
map
Edit
Please post your proceduce code so that other may understand you also.
I've read your code and here are the needed modification:
When this for loop was not commented out, the receiver webserver got the same request twice, I dont understand the purpose of this for loop.
Sorry in my previous answer, I did not clean it up. That's just a temporary idea on my head that I forgot to remove at the end :D
Just remove it from your code
// allOf() only accepts arrays, so the List needed to be converted /* The code never gets over this part (I know allOf() is a blocking call), even long after when the receiver got the HTTP request
with the correct payload. I'm not sure yet where exactly the code gets stuck */
Your map should be a ConcurrentHashMap
because you're modifying it concurrently later.
Map<String, Boolean> result = new ConcurrentHashMap<>();
If your code still does not work as expected, I suggest to remove the parallelStream()
part.
CompletableFuture
and parallelStream
use common forkjoin pool. I think the pool is exhausted.
And you should create your own pool for your CompletableFuture
:
Executor pool = Executors.newFixedThreadPool(10);
And execute your request using that pool:
CompletableFuture.supplyAsync(YOURTASK, pool).thenAcceptAsync(Yourtask, pool)