Search code examples
javaconcurrencyjava-streamforkjoinpool

Using ForkJoinPool on a set of documents


I have never used a ForkJoinPool and I came accross this code snippet.

I have a Set<Document> docs. Document has a write method. If I do the following, do I need to have a get or join to ensure that all the docs in the set have correctly finished their write method?

ForkJoinPool pool = new ForkJoinPool(concurrencyLevel);
pool.submit(() -> docs.parallelStream().forEach(
    doc -> {
        doc.write();
    })
);

What happens if one of the docs is unable to complete it's write? Say it throws an exception. Does the code given wait for all the docs to complete their write operation?


Solution

  • ForkJoinPool.submit(Runnable) returns a ForkJoinTask representing the pending completion of the task. If you want to wait for all documents to be processed, you need some form of synchronization with that task, like calling its get() method (from the Future interface).

    Concerning the exception handling, as usual any exception during the stream processing will stop it. However you have to refer to the documentation of Stream.forEach(Consumer):

    The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever thread the library chooses. […]

    This means that you have no guarantee of which document will be written if an exception occurs. The processing will stop but you cannot control which document will still be processed.

    If you want to make sure that the remaining documents are processed, I would suggest 2 solutions:

    • surround the document.write() with a try/catch to make sure no exception propagates, but this makes it difficult to check which document succeeded or if there was any failure at all; or
    • use another solution to manage your parallel processing, like the CompletableFuture API. As noted in the comments, your current solution is a hack that works thanks to implementation details, so it would be preferable to do something cleaner.

    Using CompletableFuture, you could do it as follows:

    List<CompletableFuture<Void>> futures = docs.stream()
                        .map(doc -> CompletableFuture.runAsync(doc::write, pool))
                        .collect(Collectors.toList());
    

    This will make sure that all documents are processed, and inspect each future in the returned list for success or failure.