Search code examples
apache-kafkaapache-kafka-streamsrestorestateful

Does rebuilding state stores in Kafka Streams propagate duplicate records to downstream topics?


I'm currently using Kafka Streams for a stateful application. The state is not stored in a Kafka state store though, but rather just in memory for the moment being. This means whenever I restart the application, all state is lost and it has to be rebuilt by processing all records from the start.

After doing some research on Kafka state stores, this seems to be exactly the solution I'm looking for to persist state between application restarts (either in memory or on disk). However, I find the resources online lack some pretty important details, so I still have a couple of questions on how this would work exactly:

  • If the stream is set to start from offset latest, will the state still be (re)calculated from all the previous records?
  • If previously already processed records need to be reprocessed in order to rebuild the state, will this propagate records through the rest of the Streams topology (e.g. InputTopic -> stateful processor -> OutputTopic, will this result in duplicated records in the OutputTopic because of rebuilding state)?

Solution

  • State stores use their own changelog topics, and kafka-streams state stores take on responsibility for loading from them. If your state stores are uninitialised, your kafka-streams app will rehydrate its local state store from the changelog topic using EARLIEST, since it has to read every record.

    This means the startup sequence for a brand new instance is roughly:

    • Observe there is no local state-store cache
    • Load the local state store by consumeing from the changelog topic for the statestore (the state-store's topic name is <state-store-name>-changelog)
    • Read each record and update a local rocksDB instance accordingly
    • Do not emit anything, since this is an application-service, not your actual topology
    • Read your consumer-groups offsets using EARLIEST or LATEST according to how you configured the topology. Not this is only a concern if your consumer group doesn't have any offsets yet
    • Process stuff, emitting records according to the topology

    Whether you set your actual topology's auto.offset.reset to LATEST or EARLIEST is up to you. In the event they are lost, or you create a new group, its a balance between potentially skipping records (LATEST) vs handling reprocessing of old records & deduplication (EARLIEST),

    Long story short: state-restoration is different from processing, and handled by kafka-streams its self.