Search code examples
javatraceopentracingotel

Construct trace context from a string in open telemetry java sdk


i have an ETL pipeline with 5 steps. Each step can be executed on a different thread and different application.

That makes it really hard to pass down the otel trace context through everything because the internals are not available for me so all i have access to are the processings steps.

What iam looking for is a way to construct the trace context just from a single id which uniquely identifies a complete run through all the 5 steps of a single data entry.

Example:

Data package step thread application
id_1234 1 thread_01 app_01
id_1234 2 thread_02 app_01
id_1234 3 thread_10 app_02
id_1234 4 thread_01 app_01
id_1234 5 thread_05 app_02
id_5555 1 thread_05 app_02
id_5555 2 thread_01 app_02
id_5555 3 thread_05 app_01
id_5555 4 thread_06 app_02
id_5555 5 thread_15 app_02

What iam looking for is code which works a little bit like this:

public class Step1 {
    public void execute(DataPackage obj){
         var otelContext = SpanContext.create(
              TraceId.fromBytes(obj.getUniqueId().getBytes()),
              SpanId.fromBytes(processorName.getBytes()),
              TraceFlags.getDefault(),
              TraceState.getDefault()
         );

         var wrap = Span.wrap(otelContext);

         var with = Context.root().with(wrap);
  
         var span = tracer.spanBuilder("Step1").setParent(with).startSpan();
         CompletableFuture.runAsync(() -> { /* the code is here*/ }).whenComplete((c1, exception) -> {
         if (exception != null) {
           span.recordException(exception);
         } else {
           span.end();
         }
       });
    }
}

What happens is that the start and the end of the span are separate and not under the same trace context. So something goes wrong here


Iam now trying to construct the context by hand:

    var paddedArray = new byte[16];

    var originalArray = context.getId().getBytes();
    System.arraycopy(originalArray, 0, paddedArray, 16 - originalArray.length, originalArray.length);

    var wrap = Span.wrap(SpanContext.createFromRemoteParent(
        TraceId.fromBytes(paddedArray),
        SpanId.fromBytes(paddedArray),
        TraceFlags.getDefault(),
        TraceState.getDefault())
    );
    var otelContext = Context.root().with(wrap);

    var startSpan = tracer.spanBuilder(context.getId())
        .setParent(otelContext)
        .startSpan();

The issue i had previously was that the input byte array was not of the correct length. Now i fixed that, but the next issue is that this is not appearing in jaeger at all.

I suspect its because the context didnt get created in jaeger because here it always assumes the context exists previously. Is there a way to "upsert" always a context?

I have really no way of knowing when a context should be created or not since etl pipeline 1 can run before etl pipeline 2 or vice versa


Solution

  • I think the available APIs should be quite good at handling your problem.

    Can you try the following code and let me know if it works for you?

    Also have a look at the following: https://opentelemetry.io/docs/languages/java/instrumentation/#get-the-current-span

    import io.opentelemetry.api.trace.*;
    import io.opentelemetry.context.Scope;
    import io.opentelemetry.context.Context;
    import java.util.concurrent.CompletableFuture;
    
    public class EtlStep {
        private final Tracer tracer;
    
        public EtlStep(Tracer tracer) {
            this.tracer = tracer;
        }
    
        public void execute(DataPackage obj, int step) {
            // Retrieve the current span context if available
            SpanContext parentContext = Span.fromContext(Context.current()).getSpanContext();
    
            Span span = tracer.spanBuilder("Step" + step)
                .setParent(Context.current().with(Span.wrap(parentContext)))
                .startSpan();
    
            try (Scope scope = span.makeCurrent()) {
                CompletableFuture.runAsync(() -> {
                    // Your code here
                }).whenComplete((unused, exception) -> {
                    if (exception != null) {
                        span.recordException(exception);
                    }
                    span.end();
                });
            }
        }
    }
    
    class DataPackage {
        private final String uniqueId;
    
        public DataPackage(String uniqueId) {
            this.uniqueId = uniqueId;
        }
    
        public String getUniqueId() {
            return uniqueId;
        }
    }
    

    And here is the test initialization (just a toy example):

    // Create EtlStep instances
    EtlStep step1 = new EtlStep(tracer);
    EtlStep step2 = new EtlStep(tracer);
    EtlStep step3 = new EtlStep(tracer);
    EtlStep step4 = new EtlStep(tracer);
    EtlStep step5 = new EtlStep(tracer);
    
    // Create data packages
    DataPackage dataPackage1 = new DataPackage("id_1234");
    DataPackage dataPackage2 = new DataPackage("id_5555");
    
    // Execute ETL steps for each data package
    step1.execute(dataPackage1, 1);
    step2.execute(dataPackage1, 2);
    step3.execute(dataPackage1, 3);
    step4.execute(dataPackage1, 4);
    step5.execute(dataPackage1, 5);