Search code examples
apache-kafkaspring-kafkaspring-cloud-sleuth

Configure to start new span in KafkaMessageListenerContainer not on listener (Kafka + Sleuth)


I'm new in kafka + sleuth. Currently I've faced with an issue, span creates in MessageListenerMethodInterceptor which triggers on KafkaListener, but in case of exception we are loosing spanId and traceId in ErrorHandler. Is it possible to configure sleuth to start span in KafkaMessageListenerContainer?


Solution

  • I have handled it. In my local pc it works fine, but I have some doubts...

    1. I disabled MessageListenerMethodInterceptor by setting spring.sleuth.messaging.kafka.enabled=false in application.properties

    2. I created a record interceptor, implementation of org.springframework.kafka.listener.RecordInterceptor as following:

       private final Tracer tracer;
       private final KafkaTracing kafkaTracing;
      
       private static final ThreadLocal<Span> CURRENT_SPAN = new ThreadLocal<>();
       private static final ThreadLocal<Tracer.SpanInScope> CURRENT_SPAN_IN_SCOPE = new ThreadLocal<>();
      
       @Override
       public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
           openSpan(record);
           return RecordInterceptor.super.intercept(record, consumer);
       }
      
       @Override
       public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
           Span span = CURRENT_SPAN.get();
           if (span != null) {
               String message = Optional.ofNullable(exception.getMessage()).orElseGet(() -> exception.getClass().getSimpleName());
               span.tag("error", message);
           }
       }
      
       @Override
       public void clearThreadState(Consumer<?, ?> consumer) {
           closeSpan();
       }
      
       private void openSpan(ConsumerRecord<K, V> record) {
           Span span = kafkaTracing.nextSpan(record).name("kafka-tracing").start();
           Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span);
           CURRENT_SPAN.set(span);
           CURRENT_SPAN_IN_SCOPE.set(spanInScope);
           if (LOG.isDebugEnabled()) {
               LOG.debug("created span {}", span);
           }
       }
      
       private void closeSpan() {
           Span span = CURRENT_SPAN.get();
           if (span != null) {
               try {
                   if (LOG.isDebugEnabled()) {
                       LOG.debug("finishing span {}", span);
                   }
                   span.finish();
               } catch (Exception ex) {
                   LOG.warn("on finishing Span {} got error {}", span, ex.getMessage());
               } finally {
                   CURRENT_SPAN.set(null);
               }
           }
      
           Tracer.SpanInScope spanInScope = CURRENT_SPAN_IN_SCOPE.get();
           if (spanInScope != null) {
               try {
                   if (LOG.isDebugEnabled()) {
                       LOG.debug("closing SpanInScope {}", spanInScope);
                   }
                   spanInScope.close();
               } catch (Exception ex) {
                   LOG.warn("on closing SpanInScope {} got error {}", spanInScope, ex.getMessage());
               } finally {
                   CURRENT_SPAN_IN_SCOPE.set(null);
               }
           }
       }
      

    other methods are empty.

    The bean of this Interceptor created as follow:

        @Bean
        public KafkaTracingRecordInterceptor<Object, Object> kafkaRecordInterceptor(Tracer tracer, Tracing tracing) {
            return new KafkaTracingRecordInterceptor<>(tracer, KafkaTracing.create(tracing));
        }
    

    PS. If I missed something or implemented it in wrong way please add a comment.