Search code examples
javaspring-bootmicrometermicrometer-tracingvirtual-threads

micrometer Instrumentation of Thread Switching Components for Executors.newVirtualThreadPerTaskExecutor() with for loop


I have the following piece of code:

    @PostMapping("/question")
    public String question(@RequestBody List<String> messages) {
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (String message : messages) {
                executor.submit(() -> {
                    kafkaTemplate.send("topic-loom-micrometer", message + Thread.currentThread().getName());
                });
            }
            executor.shutdown();
        }
        return "it seems everything happened correctly";
    }

The code is straightforward. I receive a list of strings from a rest endpoint.

The goal is to send all the messages to Kafka in parallel using virtual threads.

Since there are many many messages, the code here is leveraging virtual threads to increase parallelism, as a traditional for loop would send messages one by one, taking much longer.

The goal is to observe the traces, despite using virtual threads.

The doc from micrometer is https://docs.micrometer.io/micrometer/reference/observation/instrumenting.html#instrumentation_of_thread_switching_components

There are challenges adapting to my piece of code:

1 - In the doc, it mentions Executors.newCachedThreadPool(), not sure if it would work for virtual thread.

2 - The use of then(registry.getCurrentObservation()).isSameAs(parent);, not sure where it comes from.

3 - The use of ContextExecutorService.wrap(executor) seems to be deprecated. But the doc is not saying what should be used in favour of.

4 - I adapted the code as follow:

 @PostMapping("/question2")
    public String question2(@RequestBody List<String> messages) {
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            Observation parent = Observation.createNotStarted("parent", registry);
            for (String message : messages) {
                parent.observe(() -> {
                    ContextExecutorService.wrap(executor).submit(() -> {
                        return Observation.createNotStarted("child", registry)
                                .observe(() -> {
                                    kafkaTemplate.send("topic-loom-micrometer", message + Thread.currentThread().getName());
                                });
                            }
                    );
                });
            }
            executor.shutdown();
        }

Besides the above concerns, I have the following questions:

A - Is the adaptation correct since I am using a for loop?

B - It seems it needs a Callable<T>, how to make the code compile?


Solution

    1. Yes, Executors.newVirtualThreadPerTaskExecutor() should work fine as far as Observation Propagation is concerned. Observation Propagation engine should not differentiate between platform and virtual threads.

    2. then(registry.getCurrentObservation()).isSameAs(parent) is just an assertion used in testing. It is a method from org.assertj.core.api.BDDAssertions class. The documentation you mentioned uses the logic all over the place. You could see the working test examples of its usage in Micrometer GitHub repo, for example here. You could plainly ignore it in a non-test code.

    3. ContextExecutorService.wrap(executor) is indeed deprecated, but it is not that big of a deal, it seems. If you really don't want to use deprecated methods, you could use ContextExecutorService.wrap(ExecutorService, Supplier<ContextSnapshot>) method and provide the second argument effectively equivalent to one used in the deprecated method:

       ContextExecutorService.wrap(executor, 
               (Supplier<ContextSnapshot>) () -> 
           ContextSnapshotFactory.builder().build().captureAll());
      

    If, like in updated code of initial snippet, brought in the question, method returns only kind of OK string, then the adaptation may look like the following:

    @PostMapping("/question2")
    public String question2(@RequestBody List<String> messages) {
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            Observation parent = Observation.createNotStarted("parent", registry);
            for (String message : messages) {
                parent.observe(() -> {
                    ContextExecutorService.wrap(executor,
                            (Supplier<ContextSnapshot>) () -> ContextSnapshotFactory.builder().build().captureAll())
                            .submit(() -> {
                                Observation.createNotStarted("child", registry).observe(() -> {
                                    kafkaTemplate.send("topic-loom-micrometer",
                                            message + Thread.currentThread().getName());
                                });
                            });
                });
            }
            executor.shutdown();
        }
        return "it seems everything went fine";
    }
    

    Micrometer Context Propagation is also discussed in Spring Boot 3 micrometer tracing in MDC thread.

    Here I leave out of the scope the question of benefits of virtual threads in the scenario the OP described. There are too many variables in this equation to solve and the OP has not (and probably could not) clarify all details, necessary to give a useful answer. After all, the question can be thought of as a one about Micrometer Context Propagation, virtual threads notwithstanding.