OpenTracing with Kafka Streams - How to?

I am trying to integrate Jaeger tracing into K-Streams. I was planning to add tracing to few of my most important pipelines and was wondering what would be a good way to pass traceid from one piepline to another?

Here is what I have so far - At the start of stream processing pipeline, I start a server span and save the traceid into a state store. Later on, in a transform pipeline, I access the statestore and capture the trace from the transform() method. Is this a good way to handle tracing in stream processing?

  .filter((k, v) -> v.isPresent())            
  .filter((k, v) -> isInterestingEvent(v))    
  .transform(() -> new TransformerWithTracing<SomeObjectA, SomeObjectB>(IN_MEM_STORE_NAME, someFunction), IN_MEM_STORE_NAME)
  .flatMapValues(c -> c)
  .to(outTopic, Produced.with(Serdes.String(), new EventSerde()));

public class TransformerWithTracing<V, VR> implements Transformer<String, V, KeyValue<String, VR>> {

  final Function valueAction;
  final String storeId;
  private KeyValueStore<String, String> traceIdStore;

  public TransformerWithTracing(String storeId, Function valueAction) {
    this.storeId = storeId;
    this.valueAction = valueAction;

  public void init(ProcessorContext context) {
   // KeyValueStore store = ((KeyValueStore<String, String>) context.getStateStore(storeId));
    InMemoryKeyValueStore inMemoryKeyValueStore = (InMemoryKeyValueStore) store;
    this.traceIdStore = store;

  public KeyValue<String, VR> transform(String key, V value) {

    // BuildTraceHeader 
    try(Scope scope = serviceTracer.startServerSpan(traceHeader, "Converting to Enterprise Event")) {
      return KeyValue.pair(key, (VR) valueAction.apply(value));

  public KeyValue<String, VR> punctuate(long timestamp) {
    return null;

  public void close() {
//    if (streamId != null)   traceIdStore.delete(streamId);



  • There are similar ideas in this zipkin/brave repo by @jeqo.

    There also seems to be something available in opentracing-contrib repo but it seems to only at trace producer/consumer level.

