Search code examples

Spring Integration Restore ReactorContext into Imperative Processing

With our complex IntegrationFlow that switches between Reactive and Imperative processing, we are seeing a behavior where the ObservationContext (traceId) is changed right after WebFlux.outboundGateway call.

We followed the documentation and are able to see the reactorContext available via IntegrationMessageHeaderAccessor.REACTOR_CONTEXT message header, however unable to restore this to ThreadLocal for Imperative processing.

what to be done at the below sample for restoring the ObservationContext from Reactor to Imperative.

   .handle(WebFlux.outboundGateway(m -> url, webClient)
           ec -> ec.customizeMonoReply((message, mono) -> mono.contextCapture()))
   // what should be done to restore same reactorContext to imperative
   .log(DEBUG, m -> "Imperative Processing starts, as well traceId is changed here :(")


  • Apparently this use-case is not covered. The reply from WebClient is handled on a different thread. And, as a fact, of direct execution, the rest of your flow. We can see that in logs of your application:

    2024-06-21T13:14:12.835-04:00  INFO 22896 --- [   scheduling-1] [6675b4e4be3310898a8c92da890368d1-0d73b0e98d89cc04] c.i.sample.config.TracingConfig          : Request Headers: [Content-Type:"application/json", traceparent:"00-6675b4e4be3310898a8c92da890368d1-0d73b0e98d89cc04-00"]
    2024-06-21T13:14:13.522-04:00  INFO 22896 --- [ctor-http-nio-3] [6675b4e4be3310898a8c92da890368d1-0d73b0e98d89cc04] c.i.sample.config.TracingConfig          : Response Headers: [Access-Control-Allow-Origin:"*", Alt-Svc:"h3=":443"; ma=2592000", Content-Type:"application/json", Date:"Fri, 21 Jun 2024 17:14:13 GMT", Server:"Caddy", Vary:"Accept-Encoding", Transfer-Encoding:"chunked"]
    2024-06-21T13:14:13.544-04:00  INFO 22896 --- [oundedElastic-1] [6675b4e535753091d2d42ee9db350fa5-5e379125e41a00e1] o.s.integration.handler.LoggingHandler   : WebClient Response (with diferent traceId) in Imperative: {

    It is done from here in AbstractMessageProducingHandler:

    return reactiveReply.publishOn(Schedulers.boundedElastic()).toFuture();

    I'm not sure why we do thread switching here, but we are loosing that context somehow.

    I found a workaround for you:

                .handle(WebFlux.outboundGateway(ECHO_GET, webClient)
                .channel(new ExecutorSubscribableChannel())
                .log(LoggingHandler.Level.INFO, m -> "WebClient Response (with diferent traceId) in Imperative: " + m.getPayload())

    That .channel(new ExecutorSubscribableChannel()) is not involved in the observation handling, so the next log() operator is able to restore context from the traceparent message header.

    I'd appreciate if you raise a GH issue for the context switching problem when we handle reactive reply.