Search code examples
spring-integration

Spring Integration Restore ReactorContext into Imperative Processing


With our complex IntegrationFlow that switches between Reactive and Imperative processing, we are seeing a behavior where the ObservationContext (traceId) is changed right after WebFlux.outboundGateway call.

We followed the documentation and are able to see the reactorContext available via IntegrationMessageHeaderAccessor.REACTOR_CONTEXT message header, however unable to restore this to ThreadLocal for Imperative processing.

what to be done at the below sample for restoring the ObservationContext from Reactor to Imperative.

IntegrationFlow.from(someChannel())
   .handle(WebFlux.outboundGateway(m -> url, webClient)
                  .httpMethod(POST)
                  .expectedResponseType(String.class),
           ec -> ec.customizeMonoReply((message, mono) -> mono.contextCapture()))
   // what should be done to restore same reactorContext to imperative
   .log(DEBUG, m -> "Imperative Processing starts, as well traceId is changed here :(")
   .......
   .get();

Solution

  • Apparently this use-case is not covered. The reply from WebClient is handled on a different thread. And, as a fact, of direct execution, the rest of your flow. We can see that in logs of your application:

    2024-06-21T13:14:12.835-04:00  INFO 22896 --- [   scheduling-1] [6675b4e4be3310898a8c92da890368d1-0d73b0e98d89cc04] c.i.sample.config.TracingConfig          : Request Headers: [Content-Type:"application/json", traceparent:"00-6675b4e4be3310898a8c92da890368d1-0d73b0e98d89cc04-00"]
    2024-06-21T13:14:13.522-04:00  INFO 22896 --- [ctor-http-nio-3] [6675b4e4be3310898a8c92da890368d1-0d73b0e98d89cc04] c.i.sample.config.TracingConfig          : Response Headers: [Access-Control-Allow-Origin:"*", Alt-Svc:"h3=":443"; ma=2592000", Content-Type:"application/json", Date:"Fri, 21 Jun 2024 17:14:13 GMT", Server:"Caddy", Vary:"Accept-Encoding", Transfer-Encoding:"chunked"]
    2024-06-21T13:14:13.544-04:00  INFO 22896 --- [oundedElastic-1] [6675b4e535753091d2d42ee9db350fa5-5e379125e41a00e1] o.s.integration.handler.LoggingHandler   : WebClient Response (with diferent traceId) in Imperative: {
    

    It is done from here in AbstractMessageProducingHandler:

    return reactiveReply.publishOn(Schedulers.boundedElastic()).toFuture();
    

    I'm not sure why we do thread switching here, but we are loosing that context somehow.

    I found a workaround for you:

                .handle(WebFlux.outboundGateway(ECHO_GET, webClient)
                        .httpMethod(HttpMethod.GET)
                        .expectedResponseType(String.class))
                .channel(new ExecutorSubscribableChannel())
                .log(LoggingHandler.Level.INFO, m -> "WebClient Response (with diferent traceId) in Imperative: " + m.getPayload())
    

    That .channel(new ExecutorSubscribableChannel()) is not involved in the observation handling, so the next log() operator is able to restore context from the traceparent message header.

    I'd appreciate if you raise a GH issue for the context switching problem when we handle reactive reply.