Search code examples
apache-kafkaprometheusapache-kafka-streamsrocksdbrocksdb-java

Kafka Streams state store count


I have the following topology in Kafka Streams 7.2.2-ccs:

enter image description here

Or in code:

val groupedStream = StreamsBuilder().stream<String, Quote>("quotes").groupByKey()
for (windowSize in windows()) {
    groupedStream
        .windowedBy(TimeWindows.ofSizeWithNoGrace(windowSize))
        .aggregate({ Aggregator() },{ _, quote, aggregator -> aggregator.execute(quote) })
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .to("outputTopic")
}

I am using io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics to monitor the application. I have some questions:

  1. Why isn't there any metrics for the unbounded suppressed store? There any many with the label rocksdb_window_state_id, but none for suppressed.
  2. How many rocksdb instances will be created if the input topic has 3 partitions? It seems there is a segment concept for window store, but I couldn't find how many segments per window will be created.
  3. Is there a way to configure RocksDB to flush to disk all keys for windows that were closed? The container is using too much off heap memory, and it keeps growing, and I suspect it's because of that.

Solution

    1. Suppress is not based on RocksDB, but uses an in-memory store. There should be something like in-memory-suppression for the available metrics (cf https://docs.confluent.io/platform/current/streams/monitoring.html#state-store-metrics)

    2. You should get 3 segments (ie, 3 RocksDB) times the number of partitions, ie, 9 RocksDBs in your case with 3 input topic partitions.

    3. No, you cannot control the flushing. However, you can limit RocksDB memory usage via RocksDBConfigSetter that you can pass via StreamsConfig.