Search code examples
javaspringspring-bootspring-cloud-sleuth

Spring Sleuth - broken tracing on JMS ErrorHandler


I've a simple example https://github.com/gtiwari333/sleuth-jms-broken-tracing/tree/master that uses Spring Sleuth with JMS.

Here, the call to /jms endpoint queues a message and on receipt of the message at onMessage method, we are doing a GET call to /test and throwing MyException. We expect the trace id pass along to the ErrorHandler so that we see the same traceId in the log between the /jms, onMessage(), handleError(), and /test endpoints.

What I'm getting now/How to get the error:

I ran the app and hit the localhost:8080/jms endpoint. In the log below, the TraceId is not propagated in JmsListenerErrorHandler class and a new TraceId created for the GET call to /test

2020-08-04 17:55:24.212  INFO [,225c47fb814f6584,225c47fb814f6584,true] 16956 --- [nio-8080-exec-1] sleuth.SleuthApplication                 : Queuing message ...
2020-08-04 17:55:24.282  INFO [,225c47fb814f6584,eac851f1650ae8a6,true] 16956 --- [enerContainer-1] sleuth.SleuthApplication                 : JMS message received SOME MESSAGE !!!
2020-08-04 17:55:24.321  INFO [,225c47fb814f6584,612a7956f6b29a01,true] 16956 --- [nio-8080-exec-3] sleuth.SleuthApplication                 : test1 called  
<<<<<<<<< FINE UPTO HERE
2020-08-04 17:55:24.332  INFO [,,,] 16956 --- [enerContainer-1] sleuth.SleuthApplication                 : handling error by calling another endpoint ..     
<<<<<<<<< new thread started and lost tracing
2020-08-04 17:55:24.336  INFO [,4c163d0997076729,4c163d0997076729,true] 16956 --- [nio-8080-exec-2] sleuth.SleuthApplication                 : test1 called  
<<<<<<<<< new trace id received

It looks the JMS handles the receive/processing of new messages in a new thread. Sleuth has the necessary ‘instrument’ logic to intercept and propagate the Trace/Span ids to @JmsListener code but it doesn’t propagate to the org.springframework.util.ErrorHandler.

  • org.springframework.jms.listener.DefaultMessageListenerContainer.AsyncMessageListenerInvoker
  • org.springframework.jms.listener.AbstractPollingMessageListenerContainer#doReceiveAndExecute

The Code:

The @RestController and @JmsListener:

@RestController
static class Ctrl {

    @Autowired RestTemplate restTemplate;
    @Autowired JmsTemplate jmsTemplate;

    @GetMapping("/test")
    void test() {
        log.info("test1 called");
    }

    @GetMapping("/jms")
    void jms() {
        log.info("Queuing message ...");
        jmsTemplate.convertAndSend("test-queue", "SOME MESSAGE !!!");
    }

    @JmsListener(destination = "test-queue", concurrency = "5")
    void onMessage(TextMessage message) throws JMSException {
        log.info("JMS message received {}", message.getText());
        restTemplate.getForEntity("http://localhost:8080/test", Void.class); //-->it works
        throw new MyException("Some Error");  //-->it doesn't
    }
    static class MyException extends RuntimeException {
        public MyException(String msg) { super(msg); }
    }
}

The Error Handler:

@Component
static class JmsListenerErrorHandler implements ErrorHandler {

    @Autowired RestTemplate restTemplate;

    @Override
    public void handleError(Throwable t) {
        log.info("handling error by calling another endpoint .."); //1....tracing is lost here
        restTemplate.getForEntity("http://localhost:8080/test", Void.class);
    }
}

The JMS Config:

@Configuration
@EnableJms
static class ActiveMqConfig implements JmsListenerConfigurer {

    @Autowired ErrorHandler jmsListenerErrorHandler;

    @Autowired ConnectionFactory connectionFactory;

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        registrar.setContainerFactory(containerFactory());
    }

    @Bean
    JmsListenerContainerFactory<?> containerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(jmsListenerErrorHandler);
        return factory;
    }
}

What I tried:(to make it a complete SO question)

Its in the PR: https://github.com/gtiwari333/sleuth-jms-broken-tracing/pull/1/files Here, I tried to use create a custom Executor bean wrapped by LazyTraceThreadPoolTaskExecutor and tried to pass it to JmsListenerContainerFactory

Its working for a normal thread execution but not for the JMS stuff.

executor.execute(() -> log.info("Im inside thread 2")); //it works

Has someone already figured out how to intercept the ErrorHandler to pass the TraceId?


Solution

  • There is an open issue about the instrumentation of @JmsListener. So I guess at the moment is not supported.

    A possible solution is to pass the Span in the exception:

    @RestController
    static class Ctrl {
        @Autowired
        private Tracer tracer;
        // ...
        @JmsListener(destination = "test-queue", concurrency = "5")
        void onMessage(TextMessage message) throws JMSException{
            //..
            throw new MyException("Some Error",tracer.currentSpan()); // <-- pass current span
        }
    }
    

    So you can get it in JmsListenerErrorHandler:

    @Override
    public void handleError(Throwable t) {
        if(t.getCause() instanceof MyException){
            MyException mEx = (MyException) t.getCause();
            log.info("Failing span: {}",mEx.getSpan());
        }
        //...
    }
    

    MyException class:

    class MyException extends RuntimeException {
        private final Span span;
        public MyException(String msg, Span span) {
            super(msg);
            this.span=span;
        }
        // Getter for the Span
    }