Search code examples
apache-kafka-streamsrocksdb

Kafka Streams Rocksdb retention didn't remove old data with windowed function


I'm running a Kafka streams app with windowed function. But after 24 hours running, local disc usage increased from 5G to 20G and keeps increasing. From what I googled, once I introduced windowedBy, it should remove old data automatically.

My topology looks like below:

stream.selectKey(selectKey A)
.groupByKey(..)
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
.reduce((value1,value2) -> value2)
.suppress.toStreams()
.selectKey(selectKey B).mapValues().filter()
.groupByKey().reduce.toStream().to()

One thing I can't understand is, from this topology, it will create two internal repartition topics, as repartition-03 and repartition-14 for two groupBy actions. From the disc, all machines which are taking repartition-03 tasks are having high disc usage and seems never removing old data while machines which are running repartition-14 tasks are always under low disc usage.

When I log in to the machines, I found different path for those two machines as below:

/tmp/kafka-streams/test-group/2_40/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000014
/tmp/kafka-streams/test-group/1_4/KSTREAM-REDUCE-STATE-STORE-0000000003/KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000

Why they are having different path? 2_40 is for the repartition-14 tasks and it has rocksdb in the path while the other doesn't contain rocksdb. Meanwhile, taks 1_4 keeps couple folders like KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000 but with different suffix.

I though once I introduced windowedBy function, rocksdb will remove old data when window is expired? And why above two internal repartition topics have different path and retention behavior?

Any help is highly appreciated! Thanks!


Solution

  • Default retention period is 24h. You can reduce it via

    .reduce(..., Materialized.with(...).withRetention(...));