Search code examples
javaspring-webfluxproject-reactormdc

Copy Reactor Context to MDC in the case of TimeoutMainSubscriber


I have been following the code at https://github.com/archie-swif/webflux-mdc/blob/master/src/main/java/com/example/webfluxmdc/MdcContextLifter.java to put data from Reactor Context to MDC using hooks.

The solution works perfectly in most of the cases except for when Mono is timing out and TimeoutMainSubscriber is in use.

Below is the small test that I used.

Mono<Integer> integerMono = Mono.just(1)
            .doOnEach(logger.addToContext(i -> ImmutableMap.of("index", i)))
            .flatMap(___ -> Mono.fromSupplier(() -> {
                try {
                    Thread.sleep(6000);
                } catch (InterruptedException e) {
                    return 0;
                }
                return 1;
            }))
            .timeout(Duration.ofMillis(5000), Schedulers.parallel())
            .doOnEach(logger.info("testMDCLogging"))
            .doOnError(logger.error("testMDCErrorLogging"))
            .subscriberContext(logger.initContext());

When verifying the propagation of Context through MDC lifters, onNext() onError() and onComplete() are not called in TimeoutMainSubscriber (where the Context is copied over to MDC), only currentContext() is called.

How do I propagate the Context and copy over to MDC to be used in doOnError and doOnEach in the case of Mono timeout? I don't want to move the copy to currentContext() because this method is called several times and it is not an optimal solution in my opinion.


Solution

  • Did you try overriding onError method in MdcContextLifter class?

        @Override
        public void onError(Throwable t) {
            copyToMdc(coreSubscriber.currentContext()); // similar to onNext()
            coreSubscriber.onError(t);
        }