I've written a method like below:
public static CompletionStage<Tuple2<ObjectNode, String>> calculateTemplateTreeAndKeys(
String content,
RequestContext context,
MetricsClient metricsClient,
JdbcSession jdbcSession) {
AtomicReference<ObjectNode> templateTreeHolder = new AtomicReference<>();
templateTreeHolder.set(Json.rootNode());
return getTemplateIds(context, metricsClient, jdbcSession, content)
.thenCompose(
templateIds -> {
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
}));
return CompletableFuture.completedFuture(NotUsed.getInstance());
})
.thenApply(
notUsed -> {
String includedTemplateIdsStr =
getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();
System.out.println("From outer function: " + templateTreeHolder.get());
return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
});
I expect the inner block to process and update templateTreeHolder before .thenApply is invoked, so that templateTreeHolder would hold correct data to return. But, .thenApply block is processing before the inner .thenAccept block.
From the console output sequence:
From outer function: {}
From inner function: {"f9406341-c62a-411a-9389-00a62bd63629":{}}
I'm not sure what I'm doing wrong on chaining CompletionStages, kindly advise me how can I make sure that the inner block completes before the outer block?
Your function passed to thenCompose
is returning an already complete future, i.e. return CompletableFuture.completedFuture(NotUsed.getInstance());
which allows the dependent stages to proceed immediately. This seem to conflict with the evaluation of the function passed to templateIds.map(…)
, which happens asynchronously, apparently.
Generally, you should avoid such mixture of completion stages and dependencies to side effects, especially when their asynchronous evaluation is not modeled as prerequisite completion stage.
But you can work-around this, if you have no other choice:
return getTemplateIds(context, metricsClient, jdbcSession, content)
.thenCompose(
templateIds -> {
// create an initially uncompleted stage
CompletableFuture<Object> subStage = new CompletableFuture<>();
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
// complete when all work has been done
subStage.complete(null);
}));
// use this stage for dependent actions
return subStage;
})
.thenApply(
notUsed -> {
String includedTemplateIdsStr =
getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();
System.out.println("From outer function: " + templateTreeHolder.get());
return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
});
In the code above, the future will never get completed if your action fails with an exception before the completion attempt. The general pattern would be like this:
CompletableFuture<Type> stage = new CompletableFuture<>();
…
try {
code that will eventually call complete on stage
}
catch(Throwable t) {
stage.completeExceptionally(t);
}
But, of course, it will get a bit more complicated when the code supposed to complete the stage also bears asynchronous processing, so you have to guard the code trying to submit the actual completion code, as well as the actual completion code.
So, a more elaborated version of the inner code would look like:
CompletableFuture<Object> subStage = new CompletableFuture<>();
try {
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
})
.whenComplete((v,t) -> {
// complete when all work has been done
if(t != null) subStage.completeExceptionally(t);
else subStage.complete(v);
}));
} catch(Throwable t) {
subStage.completeExceptionally(t);
}
// use this stage for dependent actions
return subStage;
(perhaps, the “do something and return CompletionStage” has to be guarded with try { … } catch(Throwable t) { subStage.completeExceptionally(t); }
too)