Search code examples
apache-kafka-streams

How to restart a KafkaStreams consumer group in a way that avoids recreating the state store from its changelog topic


In a deployment with multiple nodes hosting KafkaStreams (0.10.2.1) instances with persistent state store, what is the recommended way to restart all nodes while avoiding from replaying the entire state store changelog topic? This has to be done without changing the application.id as I don't want to lose the data I already have in the state store.

I increased session.timeout.ms so that all nodes will be up by the time the broker starts to reassign partitions, and avoided calling KafkaStreams.stop to prevent an unneeded partition reassignment as I'm restarting all nodes during deployment.

When the broker starts to reassign partitions (after all nodes are up), it seems that the KafkaStreams instances are replaying the entire state store changelog topic, instead of picking up from the offset to which they arrived just before the restart.

I guess that in order to pick from the latest offset these conditions have to be met:

1) Partitions will be assigned to instances containing their matching persistent store.

2) KafkaStreams will pick up from the latest offset in the changelog topic, instead of replaying the entire changelog.

Is there a way to achieve this?


Solution

  • Kafka Streams writes local state and local checkpoint files that are used to track state store's health. If a checkpoint file is missing, it indicates a corrupted state store, and thus Kafka Streams wipes out the store and recreates it from scratch by replaying the state store's changelog topic.

    Those local checkpoint files are written on a clean shutdown in 0.10.2.1 only. Thus, as you don't call KafakStreams#close(), you don't get a clean shutdown (that might also corrupt your state as some writes might not have be flushed to disk).

    In Kafka 0.11.0.x, local checkpoint files are written on every commit allowing more aggressive reuse of local state stores.

    I would highly recommend to upgrade to 0.11.0.1 or 1.0.0 (will be released shortly) -- it contains many improvements with regard to state store handling and rebalancing. Note, you don't need to upgrade your brokers for this, as Kafka Streams is compatible with older brokers, too (cf. https://docs.confluent.io/current/streams/upgrade-guide.html#compatibility)