I am trying to chain tasks in CompletableFuture to execute them in parallel and get the best performance and resource utilization.
The workflow I am writing has five stages and each stage depends on the result of the previous stages.
The workflow execution is as follows:
* Stage-1: Create CustomerContext -> This will call downstream services and return Context object.
* Stage-2: Generate Candidates for the customer -> This stage generate candidates and returns a List<Candidate> object. This stage requires CustomerContext as a parameter to generate the candidates.
* Stage-3: Process Candidate -> In this stage I process the candidates and returns processed List<Candidate>. This stage requires CustomerContext and List<Candidate> object from stage-1 and stage-2
* Stage-4: Rank candidates -> In this stage I perform candidate ranking and returns ranked List<Candidate> object. This stage requires CustomerContext and List<Candidate> object from stage-1 and **stage-3**
* Stage-5: Generate Result -> In this stage I generate the final result and returns Result object by making downstream API calls., This stage requires CustomerContext and List<Candidate> object from **stage-1** and **stage-4**
I can create a result holder object to hold each stage's results. However, I am not sure if that is the best solution.
Is CompletableFuture the best solution for this use case? What would be the optimal way to chain these stages?
Use thenCompose
to compose the results from different futures which depend on each other:
CompletableFuture<String> result =
CompletableFuture.supplyAsync(() -> "result 1")
.thenCompose(result1 ->
CompletableFuture.supplyAsync(result1::length)
.thenCompose(strLen ->
CompletableFuture.supplyAsync(strLen::toString)));
Note though, since each future depend on the result of the previous one, there is no way in executing them in parallel. Each future will still execute after the previous has completed. If the futures do not depend on each other, you can run them in parallel by "unearthing the hidden applicative".
Also note, CompletableFuture
only has supplyAsync
and runAsync
taking a Supplier
and a Runnable
, respectively and there is no direct way of dealing with asynchronous actions that might throw an exception (e.g. running Callable
). If you need to do that, you have to delegate to a helper future as described in this answer.