I'm using org.apache.kafka.streams.KafkaStreams
and for example my topology looks this:
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic1")
.mapValues((readOnlyKey, value) -> value.toUpperCase())
.to("output-topic1");
builder.stream("input-topic2")
.mapValues((readOnlyKey, value) -> value.toUpperCase())
.to("output-topic2");
Every two minutes by default KafkaStream logs:
Processed 14 total records, ran 0 punctuators, and committed 11 total tasks since the last update"}
I would like to have better overview if I have any incoming messages into each input topic. I want to see some more metrics out of each stream instead of total number records processed. Is it possible to give a name for a stream and then extract for example offset for builder.stream("input-topic1")
and builder.stream("input-topic2")
seperately? Or to have possibility to know how many records each stream processed in some kind of time frame.
.peek
and have some kind of static variable out of the stream, but i consider such approach very bad practice.better overview if I have any incoming messages into each input topic
This is simply consumer lag metric, which can be monitored, yes
Related post just asked - exposing kafka topic LAGs using kafka jmx exporter
Maybe i could use .peek and have some kind of static variable out of the stream
You could use a Micrometer metric registry. That's not a bad practice at all