Search code examples
apache-kafkaapache-kafka-streams

Kafka Stream/Table Join and Message Headers


Have spent a whole day googling and trying, and what I believe the current state is that the message headers are accessible from the Processor APIs.

I wanted to access to Process API from DSL, so I implemented a ValueTransformSupplier and from there I have access to the processor context which gives me access to the header of the Stream.

But here's the kicker ..

I am doing a Stream / Table join and the header I to access is from the Table record NOT the message header from the stream which is what the ProcessContext holds.

So, is there a way to access the headers on the message that is represented in the KTable from the Stream/Table join?


Solution

  • When you read data into a KTable, record headers are not store. You would need to copy the headers into the value to preserve them.

    KTable t = builder.stream("table-topic")
                      .transformValues(/* access headers and copy into value */)
                      .toTable();