Search code examples
javaspringasynchronousspring-cloud-sleuth

Propagate Sleuth baggage on parallel streams


This question is exactly the same as this one, which wasn't actually answered (that code only uses one thread). My code looks like this at the moment

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
            foo.stream().parallel()
                    .forEach(bar -> {
                                //business logic
                            }
                    );
            return null;
        }, new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooBarStream"));

completableFuture.get();

yet only one thread is correctly traced. Using .parallelStream() or a LazyTraceExecutor directly instead of a TraceableExecutorService didn't help.


Solution

  • Seems to work thanks to this example. The snippet above becomes:

    TraceableExecutorService executorService = new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooStream");
    CompletableFuture.allOf(runnablesBusinessLogic(foo,executorService)).get();
    

    where runnablesBusinessLogic is

    private CompletableFuture<Void>[] runnablesBusinessLogic(List<FooBar> foo, ExecutorService executorService) {
        List<CompletableFuture<?>> futures = new ArrayList<>();
        for (FooBar f : foo) {
            futures.add(CompletableFuture.runAsync(() -> {
                businessLogic(f);
                return;
            }, executorService));
        }
        return futures.toArray(new CompletableFuture[futures.size()]);
    }
    

    If I understood the example (and the discussion behind the current status of the documentation) correctly, Sleuth cannot work with a ForkJoinPool (and so with parallel streams) automatically. The main idea to make it work is not to create a CompletableFuture and split it, but to create several futures (and join them).