Need a possibility to add tracing id from spring-sleuth in kafka producer callback
I add a sleuth starter to my POM and create a kafka-producer. I'm looking for a way to add current trace_id to the logs in callback.
ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
send.addCallback(
new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
log.error(
MessageFormat.format(
"Error when sending message {0} to Kafka", data.getGlobalUUID()),
throwable);
deferredResult.setErrorResult(
new MarkusKafkaInputException(
MessageFormat.format(
"Error proceed {0} with message: {1}",
data.getGlobalUUID(), throwable.getMessage())));
}
@Override
public void onSuccess(SendResult<K, V> result) {
log.trace(result.toString());
}
});
Sleuth dependency in POM -
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
I expect trace_id in logs, which are used in callback. Or there is no ways to get trace_id asynchronously?
Find the way to do it, but not sure that it is correct
private static class TracingCallback<K, V> implements ListenableFutureCallback<SendResult<K, V>> {
private final Span span;
private final Tracer tracer;
private TracingCallback(Span span, Tracer tracer) {
this.span = span;
this.tracer = tracer;
}
@Override
public void onFailure(Throwable throwable) {
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
log.error("test111");
} finally {
span.finish();
}
}
@Override
public void onSuccess(SendResult<K, V> kvSendResult) {
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
log.error("test111");
} finally {
span.finish();
}
}
}
and in code use -
@Autowired private Tracer tracer;
...
ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
send.addCallback(new TracingCallback<>(tracer.currentSpan(), tracer));
...