Search code examples
javakubernetesapache-kafkaapache-kafka-streams

Kafka Streams on Kubernetes: Long rebalancing after redeployment


The Problem

We use a StatefulSet to deploy a Scala Kafka Streams application on Kubernetes. The instances have separate applicationIds, so they each replicate the complete input topic for fault-tolerance. They are essentially read-only services that only read in a state topic and write it to the state store, from where customer requests are served via REST. That means, the consumer group always consist of only a single Kafka Streams instance at any given time.

Our problem is now that, when triggering a rolling restart, each instance takes about 5 minutes to start up, where most of the time is spent waiting in the REBALANCING state. I've read here that Kafka Streams does not send a LeaveGroup request in order to come back fast after a container restart, without rebalancing. How come this does not work for us and why does the rebalancing take so long, even though the applicationId is identical? Ideally, to minimize downtime, the application should take over immediately from where it left when it was restarted.

Config

Here are some configs we changed from the default values:

properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), "300000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest")
// RocksDB config, see https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html
properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, classOf[BoundedMemoryRocksDBConfig])    

Questions / Related configs

  • Would it help to decrease session.timeout.ms? We set it to quite a large value as the Kafka brokers live in a different data center and network connections are at times not super reliable.
  • This answer suggests to decrease max.poll.interval.ms, as it is tied to a rebalance timeout. Is that correct? I'm hesitant to change this, as it might have consequences on the normal operation mode of our app.
  • There is mention of a config group.initial.rebalance.delay.ms to delay rebalancing during a deployment - but that would cause delays also after recovery from a crash, wouldn't it?
  • I also stumbled upon KIP-345, which targets to eliminate consumer rebalancing for static memberships entirely via group.instance.id, which would be a good fit for our user case, but it does not seem to be available yet on our brokers.

I'm confused by the multitude of configs and how to use them to enable fast recovery after an update. Can someone explain how they play together?


Solution

  • The other question you cite does not say that a rebalance is avoided on restart. Not sending a LeaveGroupRequest only avoids a rebalance when you stop the app. Hence, the number of rebalances is reduced from two to one. Of course, with your somewhat unusual single-instance deployment, you don't gain anything here (in fact, it might actually "hurt" you...)a

    Would it help to decrease session.timeout.ms? We set it to quite a large value as the Kafka brokers live in a different data center and network connections are at times not super reliable.

    Could be, depending how quickly you restart the app. (More details below.) Maybe just try it out (ie, set it to 3 minutes to still have a high value for stability and see it the rebalance time drop to 3 minutes?

    This answer suggests to decrease max.poll.interval.ms, as it is tied to a rebalance timeout. Is that correct? I'm hesitant to change this, as it might have consequences on the normal operation mode of our app.

    max.poll.interval.ms also affects rebalance time (more details below). However, default value is 30 seconds and thus should not result in a 5 minute rebalance time.

    There is mention of a config group.initial.rebalance.delay.ms to delay rebalancing during a deployment - but that would cause delays also after recovery from a crash, wouldn't it?

    This only applies to empty consumer groups and the default value is just 3 seconds. So it should not affect you.

    I also stumbled upon KIP-345, which targets to eliminate consumer rebalancing for static memberships entirely via group.instance.id, which would be a good fit for our user case, but it does not seem to be available yet on our brokers.

    Using static group membership might actually be the best bet. Maybe it's worth to upgrade your brokers to get this feature.

    Btw, the difference between session.timeout.ms and max.poll.interval.ms is explained in another question: Difference between session.timeout.ms and max.poll.interval.ms for Kafka 0.10.0.0 and later versions

    In general, the broker side group-coordinator maintains a list of all member per "group generation". A rebalance is triggered if a member leave the group actively (via sending LeaveGroupRequest), times out (via session.timeout.ms or max.poll.interval.ms), or a new member joins the group. If a rebalance happens, each member gets a chance to rejoin the group to be included in the next generation.

    For your case, the group has only one member. When you stop your app, no LeaveGroupRequest is sent and thus the group-coordinator would remove this member only after session.timeout.ms passed.

    If you restart the app, it comes back as a "new" member (from a group-coordinator point of view). This would trigger a reblance, giving all member of the group a change to re-join the group. For your case, the "old" instance might still be in the group and thus the rebalance would only move forward after the group-coordinator removed the old member from the group. The issue might be, that the group-coordinator thinks that the group scales out from one to two members... (This is what I meant above: if a LeaveGroupRequest would be sent, the group would become empty when you stop you app, and on restart only the new member would be in the group and the rebalance would move forward immediately.)

    Using static group membership would avoid the issue, because on restart the instance could be re-identified as the "old" instance, and the group-coordinator does not need to wait to expire the old group member.