I am using a ThreadPoolExecutor.submit
to create multiple futures to write parallel into a database.
for(BusinessElement el : elements){ list.add(threadPoolExecutor.submit(() -> createElement(el))); }
So what I want to do is when one submit failed to stop all execution, means when one submit failed that it should stop the execution of all active threads and go out from the the foreach block.
Is their any elegant solution?
If you can, use structured concurrency:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
...
scope.fork(() -> createElement(el));
scope.join();
}
If structured concurrency isn't an option, use tascalate-concurrent. It's an implementation of CompletionStage
that is well suited for I/O tasks, is interruptable and cascades cancellations, none of which can be said for CompletableFuture
(itself an implementation of CompletionStage
).
If you don't care about interruption, CompletableFuture
is enough:
CompletableFuture<?>[] futures = elements.stream()
.map(el -> CompletableFuture.supplyAsync(() -> createElement(el), threadPoolExecutor))
.toArray(CompletableFuture<?>[]::new);
CompletableFuture<Void> allDone = CompletableFuture.allOf(futures);
allDone.join(); //wait for all to finish or one to fail
After this, you can inspect if allDone
succeeded or failed, e.g. with allDone.isCompletedExceptionally()
, and it will fail if any of the subtasks failed. You can also extract the individual results etc. But! All tasks will always be submitted, and no task will ever be interrupted/aborted. If that's an issue, use tascalate-concurrent instead. Remember you can always turn a CompletionStage
into a CompletableFuture
via toCompletableFuture()
;
Is it possible to add a callable to a future object to get notified when its failed?
Sure. There are many callbacks on CompletableFuture
, e.g.
future.exceptionally(exception -> ...)
, //handle exceptionfuture.exceptionallyAsync(exception -> ...)
,future.handle((result, exception) -> ...)
//handle all results