Search code examples
spring-kafkaspring-cloud-sleuth

Add trace_id in kafka producer callback


Need a possibility to add tracing id from spring-sleuth in kafka producer callback

I add a sleuth starter to my POM and create a kafka-producer. I'm looking for a way to add current trace_id to the logs in callback.

ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
    send.addCallback(
        new ListenableFutureCallback<>() {
          @Override
          public void onFailure(Throwable throwable) {
            log.error(
                MessageFormat.format(
                    "Error when sending message {0} to Kafka", data.getGlobalUUID()),
                throwable);
            deferredResult.setErrorResult(
                new MarkusKafkaInputException(
                    MessageFormat.format(
                        "Error proceed {0} with message: {1}",
                        data.getGlobalUUID(), throwable.getMessage())));
          }

          @Override
          public void onSuccess(SendResult<K, V> result) {
            log.trace(result.toString());
          }
        });

Sleuth dependency in POM -

       <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
            <version>2.1.2.RELEASE</version>
        </dependency>

I expect trace_id in logs, which are used in callback. Or there is no ways to get trace_id asynchronously?


Solution

  • Find the way to do it, but not sure that it is correct

    private static class TracingCallback<K, V> implements ListenableFutureCallback<SendResult<K, V>> {
    
        private final Span span;
        private final Tracer tracer;
    
        private TracingCallback(Span span, Tracer tracer) {
          this.span = span;
          this.tracer = tracer;
        }
    
        @Override
        public void onFailure(Throwable throwable) {
          try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
            log.error("test111");
          } finally {
            span.finish();
          }
        }
    
        @Override
        public void onSuccess(SendResult<K, V> kvSendResult) {
          try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
            log.error("test111");
          } finally {
            span.finish();
          }
        }
      }
    

    and in code use -

    @Autowired private Tracer tracer;
    
    ...
    
    ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
        send.addCallback(new TracingCallback<>(tracer.currentSpan(), tracer));
    ...