Search code examples
javaquarkusopen-telemetrydistributed-tracingsmallrye-reactive-messaging

quarkus opentelemetry not propagating the traces to the smallrye-messaging consumer of another quarkus microserice


I have 2 quarkus applications, microserviceA, microservieB. having below dependency with other required dependencies in both the services,

implementation 'io.quarkus:quarkus-smallrye-reactive-messaging-kafka:3.9.5'
implementation 'io.opentelemetry:opentelemetry-extension-trace-propagators'
implementation 'io.quarkus:quarkus-opentelemetry'

microserviceA has a consumer. This consumer does some grpc calls, http call and add data to database and then produces message to another topic, which is consumed by microservieB.

Here I loose the traces on microservieB, It create a new trace.

According to this doc https://quarkus.io/guides/opentelemetry in quarkus-messaging kafka section I am manually passing the context to the consumer from the producer still no luck. It is mentioned in the one of comment of a github issue as well to pass the context manually (https://github.com/quarkusio/quarkus/discussions/26353)

On Jaeger monitoring tool, It creates different trace for consumer in microservieB.

As i mentioned earlier In microserviceA, I have a consumer. This consumer does some grpc calls, http call and add data to database and then produces message to another topic for microserviceB to consume. So the database calls and the producing to kafka doesn't depend on each other so putting the producing part in async call, see below.

   @Channel("data-updates-out")
   Emitter<DataResponse> kafkaEmitter;
   
  public void publishToKafka(String id, List<Data> data) {

    Log.infof("Producing data for id: %s, %s", id, data);
    OutgoingKafkaRecordMetadata<String> stringOutgoingKafkaRecordMetadataBuilder =
            OutgoingKafkaRecordMetadata.<String>builder().withKey(id).build();
    TracingMetadata tm = TracingMetadata.withCurrent(Context.current());
    DataResponse response = GetMarketOddsResponse.newBuilder().addData(data).build();
    Message<DataResponse> message =
            Message.of(response, Metadata.of(stringOutgoingKafkaRecordMetadataBuilder, tm));

    CompletableFuture.runAsync(() -> kafkaEmitter.send(message));
}

At consumer side in microserviceB, I am checking the context and attaching it.

@Incoming("data-updates-in")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public CompletionStage<Void> consumerData(Message<DataResponse> message) {
    marketOddsResponseMessage.getMetadata(TracingMetadata.class).ifPresent(metadata -> {
        Log.infof("tracing context" + metadata.getCurrentContext());
        QuarkusContextStorage.INSTANCE.attach(metadata.getCurrentContext());
    });
    Log.infof("Received data event with value : %s", message.getPayload());
    return message.ack();
}

None of the above worked, So I tried adding below dependency. That didn't work as well.

implementation("io.opentelemetry:opentelemetry-extension-trace-propagators").

NOTE: quarkus version : 3.9.2, quarkus-opentelemetry : 3.9.2

This is kind of log I get when producing message from microserviceA.

2024-05-02T12:49:20.082Z INFO  traceId= parentId=, spanId=, sampled= [co.sp.pr.XProducer] (ForkJoinPool.commonPool-worker-3) Producing message
   

Please help me understand what am I doing wrong??


Solution

  • CompletableFuture creates a different thread, opentelemetry doesn't pass context to new thread out of the box, So Pass the context manually to a different thread. Look at the below example.

        Context currentContext = Context.current();
        CompletableFuture.runAsync(() -> {
            try (Scope scope = currentContext.makeCurrent()) {
                publishStateToKafka(xState);
            }
        });