I have the following topology definition and there are two application instances on the environment:
KStream<String, ObjectMessage> stream = kStreamBuilder.stream(inputTopic);
stream.mapValues(new ProtobufObjectConverter())
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(100)))
.aggregate(AggregatedObject::new, new ObjectAggregator(), buildStateStore(storeName))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withMaxRecords(config.suppressionBufferSize())))
.mapValues(new AggregatedObjectProtobufConverter())
.toStream((key, value) -> key.key())
.to(outputTopic);
private Materialized<String, AggregatedObject, WindowStore<Bytes, byte[]>> buildStateStore(String storeName) {
return Materialized.<String, AggregatedObject, WindowStore<Bytes, byte[]>>as(storeName)
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde<>(AggregatedObject.class));
}
This topology is created for multiple input topics in a for
loop so once application instance has multiple topologies. Every toplogy has a state store created from the pattern KSTREAM-AGGREGATE-%s-STATE-STORE-0000000001
like Opening store KSTREAM-AGGREGATE-my.topic.name-STATE-STORE-0000000001
.
Now, until recently we didn't have configured state-dir
directory and since we used K8S stateful-set, the store wasn't persisted between restarts, so application had to rebuild the state as far as I understand how kafka-streams
work.
Our logs were full of logs like below, but only with changing time (the suffix after last dot).
INFO 1 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore Opening store KSTREAM-AGGREGATE-my.topic.name-STATE-STORE-0000000001.1675576920000 in regular mode
However the time in millis 1675576920000
is one day ago and some are even from 3 days ago. Today I added the state-dir
to the app, but this log is still being shown all the time. Should we simply wait some time until everything be processed or we are doing something wrong?
Can someone explain to me why RocksDBTimestampedStore
is logging so much? Also, the time that is being logged from those stores is not 100 ms
as defined by windowed operation, why?
If you use a windowed aggregation, the corresponding store is actually multiple RocksDBs (so-called segments), each storing all windows for a particular time-slice. IIRC, the minimum segment size is 60 seconds.
While time progressed, new segment-stores are created (resulting in the log line you see), while older segments are cleaned up. It's nothing to worry about in general. Note, it's just an INFO log.