I face some strange behavior while using completableFutures See my code snippet below for easy and better understanding. I iterate through list of messages and call handleMessage on each message. handleMessage method first calls getDataContentIdByService and finally mappingService.process method is called.
But the issue is once getDataContentIdByService
method processing is done, before execution of mappingService.process method is complete, the call is returned back to whenComplete stage.
What i want is both getDataContentIdByService and mappingService.process methods should finish the execution in sequence and then whenComplete stage should be called.
Is something wrong in my code .. or? can someone help?
Completablefuture.allOf(messages.getList().stream()
.filter(this::msgOkOrLog)
.filter(this::notOnDenyList)
.map(msg -> handleMessage(msg, messages.getTrackingIdentifier(), messages.getMessageType()))
.toArray(CompletableFuture<?>[]::new))
.whenComplete((input, exception) -> countErrorsInContainer(messages));
The handleMessage function
protected CompletableFuture<Void> handleMessage(InternalProxyMessage message,
TrackingIdentifier containerTid, MessageType messageType) {
return getDataContentIdByService(message)
.thenAccept(message::setContentId)
.thenAccept(
mappingService.process(message)
.exceptionally(
ex -> {
throw new InternalServerErrorException("Unmanaged Error", ex);
}
})));
}
Your example could be greatly simplified:
public static void main(String[] args) {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "initiated");
CompletableFuture<Void> cf2 = cf1.thenAccept(x -> {
CompletableFuture<String> response = composed(x);
return;
});
System.out.println("Is it done : " + cf2.isDone());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
}
private static CompletableFuture<String> composed(String s) {
return CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println("composed is actually finished");
return "done";
});
}
When cf1
is completed, thenAccept
will be called; but of course since composed
is called in a different thread (that CompletableFuture::supplyAsync
), return
statement will be hit before it will be finished. Running the example above reveals:
Is it done : true
composed is actually finished
On the other hand, if you change your code to :
CompletableFuture<String> cf2 = cf1.thenCompose(x -> {
CompletableFuture<String> response = composed(x);
return response;
});
You are now returning the result of that composed
. So when cf1
is completed, thenCompose
will be called; so cf2
will be completed only when the inner future will.