Search code examples
springspring-integrationspring-micrometermicrometer-tracing

Tracing in Spring Integration (Spring boot 3.0.9)


I'm trying to use Spring integration: I receive a message via JMS and send it to Kafka:

@Override
protected IntegrationFlowDefinition<?> buildFlow() {
      return from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(mqInQueue))
             .log(LoggingHandler.Level.DEBUG, message -> "Received JMS message: " + message.getPayload())
             .channel(channels -> MessageChannelFactory.create(channels, "request-channel-1"))
             .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(kafkaOutTopic));
}

For checks I am send messages to MQ using POST request + JmsTemplate:

@PostMapping("/mq")
public String sentToMq(@RequestBody final String body) {
     jmsTemplate.convertAndSend(mqRequestQueue, body, m -> {
         final var span = tracer.startScopedSpan("jms-send");
         try {
             final var context = span.context();
             m.setStringProperty("b3", "%s-%s-%s".formatted(context.traceId(), context.spanId(), Boolean.TRUE.equals(context.sampled()) ? "1" : "0"));
         } finally {
             span.end();
         }

         return m;
     });
     return "done";
}

Everything works fine, except tracing. I'm set b3 header manually before sending, but after receiving Spring Integration overrides it. What should I do to keep the incoming traceId?

Also I have next configuration: @EnableIntegrationManagement(observationPatterns = "*")

    @Bean
    @GlobalChannelInterceptor(order = Ordered.HIGHEST_PRECEDENCE)
    public ChannelInterceptor observationPropagationChannelInterceptor(final ObservationRegistry observationRegistry) {
        return new ObservationPropagationChannelInterceptor(observationRegistry);
    }

Solution

  • OK. I know where is the problem. The JmsMessageDrivenEndpoint does not propagate its registerObservationRegistry(ObservationRegistry) down to the ChannelPublishingJmsMessageListener, therefore an IntegrationObservation.HANDLER does not restore a trace from that b3 header.

    An observation on the MessageChannel consult only with the current context and really populate a fresh b3 header according to that current context.

    Consider to use Jms.messageDrivenChannelAdapter(AbstractMessageListenerContainer) instead and use AbstractMessageListenerContainer.setObservationRegistry() to enable its JmsObservationDocumentation.JMS_MESSAGE_PROCESS consumer tracing.

    For JmsMessageDrivenEndpoint, please, raise a GH issue in Spring Integration.