Search code examples
apache-kafkaprometheusapache-kafka-streamsktable

Kafka Streams K-Table size monitoring


I have a stream topology which consumes from a topic and runs an aggregation and builds a KTable which is materialized into rocksDB.

I have another application that consumes all events from that same topic daily, and sends tombstone messages for events that meet some specific criteria (i.e. they are no longer needed). The aggregation deals with this and deletes from the state stores, but I'm looking at monitoring either the size of the state store or the change log topic - anything really that tells me the size of the ktable.

I have exposed the JMX metrics, but there is nothing there that appears to give me what I need. I can see the total number of "puts" into rocksDB, but not the total number of keys. My apps are spring boot and I would like to expose the metrics via prometheus.

Has anyone solved this issue or any ideas that would help?


Solution

  • You can get the approximate count in each partition by access to the underlying state store of the KTable using this KeyValueStore#approximateNumEntries() and then export this count to prometheus (each partition has one count).

    To access to the underling state store you can using the low level processor API to get access to a KeyValueStore through each ProcessorContext in each StreamTask (correspond to a partition). Just add a KStream#transformValues() to your Topology:

    kStream
            ...
            .transformValues(ExtractCountTransformer::new, "your_ktable_name")
            ...
    

    And in ExtractCountTransformer extract the count to prometheus:

    @Log4j2
    public class ExtractCountTransformer implements ValueTransformerWithKey<String, String, String> {
    
        private KeyValueStore<String, String> yourKTableKvStore;
        private ProcessorContext context;
    
        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            yourKTableKvStore = (KeyValueStore<String, String>) context.getStateStore("your_ktable_name");
        }
    
        @Override
        public String transform(String readOnlyKey, String value) {
            //extract count to prometheus
            log.debug("partition {} - approx count {}", context.partition(), yourKTableKvStore.approximateNumEntries());
            yourKTableKvStore.approximateNumEntries();
            return value;
        }
    
        @Override
        public void close() {
    
        }
    }