Search code examples
apache-kafkaapache-kafka-streams

Global window store cannot be restored


Through the StreamsBuilder, Kafka streams provides a way to define global window stores. Once defined, it's possible to populate that store by firing an event into the topic associated to that store.

In comparison, a local store is populated through the put method manipulating the key (see TimeFirstWindowKeySchema.toStoreKeyBinary) but for a global store, there is nothing to manage/build that internal key. As a consequence, an exception will be fired when trying to restore a global window store due to the key format which is not correct (see WindowKeySchema#extractStoreTimestamp)

Implemented logic:

  • create a StreamsBuilder
  • add a global window store
  • start the stream
  • populate the stream by firing events into the topic associated to the store
  • stop the stream
  • create another stream with another temp dir to allow rebuilding the store from the topic
  • EXCEPTION

Exception

Caused by: java.lang.IndexOutOfBoundsException
at java.base/java.nio.Buffer.checkIndex(Buffer.java:749)
at java.base/java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:491)
at org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp(WindowKeySchema.java:211)
at org.apache.kafka.streams.state.internals.WindowKeySchema.segmentTimestamp(WindowKeySchema.java:75)

Questions:

  1. does it make sense to define a global window store?
  2. if yes, does Kafka will/should use the timestamp of the record to make the link between a window and the record?

General issue we are trying to solve:

We want an automatic way to clean the RocksDB associated to a global store. This DB is not cleaned when using a key value store.

Any ideas or suggestions are welcome.


Solution

  • You have a couple of options for handling this:

    1. Create a repartition topic with a key which the store can consume and manage. If the topic contains tombstones kafka streams API manages the cleanup for you. Is there any particular reason to utilize a GlobalStore, like the need to access data across multiple partitions?
    2. Use the Processor API and Punctuator, add some kind of timestamp to your data and have the punctuator do the cleanup of your data when the record expires. That can mean 1 day, a week or whenever you feel that the message is no longer relevant. This can also be done utilizing some combination of object fields to expire only when object meets certain state.
    3. Use TimestampedKeyValueStore with a Punctuator for cleanup.

    I'd try to stay away from a GlobalStore because these tend not to scale well and increase the load on the brokers (unless your design mandates otherwise)