I have been following the code at https://github.com/archie-swif/webflux-mdc/blob/master/src/main/java/com/example/webfluxmdc/MdcContextLifter.java to put data from Reactor Context to MDC using hooks.
The solution works perfectly in most of the cases except for when Mono
is timing out and TimeoutMainSubscriber
is in use.
Below is the small test that I used.
Mono<Integer> integerMono = Mono.just(1)
.doOnEach(logger.addToContext(i -> ImmutableMap.of("index", i)))
.flatMap(___ -> Mono.fromSupplier(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
return 0;
}
return 1;
}))
.timeout(Duration.ofMillis(5000), Schedulers.parallel())
.doOnEach(logger.info("testMDCLogging"))
.doOnError(logger.error("testMDCErrorLogging"))
.subscriberContext(logger.initContext());
When verifying the propagation of Context through MDC lifters, onNext()
onError()
and onComplete()
are not called in TimeoutMainSubscriber
(where the Context is copied over to MDC), only currentContext()
is called.
How do I propagate the Context and copy over to MDC to be used in doOnError
and doOnEach
in the case of Mono
timeout? I don't want to move the copy to currentContext()
because this method is called several times and it is not an optimal solution in my opinion.
Did you try overriding onError
method in MdcContextLifter class?
@Override
public void onError(Throwable t) {
copyToMdc(coreSubscriber.currentContext()); // similar to onNext()
coreSubscriber.onError(t);
}