Search code examples
apache-kafkaapache-kafka-streams

How to get current Kafka topic inside Kafka stream?


my scenario is I use make a lot of Kafka topic which share prefix (ex. house.door, house.room ) and consume all topic by using Kafka stream regex topic pattern API. everything look good,I get key and message of the data.

In order to process data I need topic name so I can do join base on topic name, but I do not know how to get topic name inside Kafka stream DSL.

one possible way to solve my problem is save topic name with my message. but It's would be better if I can get topic name directly.

so, How to get current Kafka topic inside Kafka stream ?


Solution

  • FAQ: https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information

    Record metadata is accessible through the Processor API. It is also accessible indirectly through the DSL thanks to its Processor API integration.

    With the Processor API, you can access record metadata through a ProcessorContext. You can store a reference to the context in an instance field of your processor during Processor#init(), and then query the processor context within Processor#process(), for example (same for Transformer). The context is updated automatically to match the record that is currently being processed, which means that methods such as ProcessorContext#partition() always return the current record’s metadata. Some caveats apply when calling the processor context within scheduled punctuate() function, see the Javadocs for details.

    If you use the DSL combined with a custom Transformer, for example, you could transform an input record’s value to also include partition and offset metadata, and subsequent DSL operations such as map or filter could then leverage this information.