I'm new in kafka + sleuth. Currently I've faced with an issue, span creates in MessageListenerMethodInterceptor which triggers on KafkaListener, but in case of exception we are loosing spanId and traceId in ErrorHandler. Is it possible to configure sleuth to start span in KafkaMessageListenerContainer?
I have handled it. In my local pc it works fine, but I have some doubts...
I disabled MessageListenerMethodInterceptor
by setting spring.sleuth.messaging.kafka.enabled=false
in application.properties
I created a record interceptor, implementation of org.springframework.kafka.listener.RecordInterceptor
as following:
private final Tracer tracer;
private final KafkaTracing kafkaTracing;
private static final ThreadLocal<Span> CURRENT_SPAN = new ThreadLocal<>();
private static final ThreadLocal<Tracer.SpanInScope> CURRENT_SPAN_IN_SCOPE = new ThreadLocal<>();
@Override
public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
openSpan(record);
return RecordInterceptor.super.intercept(record, consumer);
}
@Override
public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
Span span = CURRENT_SPAN.get();
if (span != null) {
String message = Optional.ofNullable(exception.getMessage()).orElseGet(() -> exception.getClass().getSimpleName());
span.tag("error", message);
}
}
@Override
public void clearThreadState(Consumer<?, ?> consumer) {
closeSpan();
}
private void openSpan(ConsumerRecord<K, V> record) {
Span span = kafkaTracing.nextSpan(record).name("kafka-tracing").start();
Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span);
CURRENT_SPAN.set(span);
CURRENT_SPAN_IN_SCOPE.set(spanInScope);
if (LOG.isDebugEnabled()) {
LOG.debug("created span {}", span);
}
}
private void closeSpan() {
Span span = CURRENT_SPAN.get();
if (span != null) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("finishing span {}", span);
}
span.finish();
} catch (Exception ex) {
LOG.warn("on finishing Span {} got error {}", span, ex.getMessage());
} finally {
CURRENT_SPAN.set(null);
}
}
Tracer.SpanInScope spanInScope = CURRENT_SPAN_IN_SCOPE.get();
if (spanInScope != null) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("closing SpanInScope {}", spanInScope);
}
spanInScope.close();
} catch (Exception ex) {
LOG.warn("on closing SpanInScope {} got error {}", spanInScope, ex.getMessage());
} finally {
CURRENT_SPAN_IN_SCOPE.set(null);
}
}
}
other methods are empty.
The bean of this Interceptor created as follow:
@Bean
public KafkaTracingRecordInterceptor<Object, Object> kafkaRecordInterceptor(Tracer tracer, Tracing tracing) {
return new KafkaTracingRecordInterceptor<>(tracer, KafkaTracing.create(tracing));
}
PS. If I missed something or implemented it in wrong way please add a comment.