Search code examples
javaspringapache-kafkaspring-kafka

How to deal properly with CommitFailedException: "... it is likely that the consumer was kicked out of the group."?


I have a topic consumer that happened to be slow in processing and got kicked from Kafka, resulting in

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

While I do known about the cause and the relevant configuration options (max.poll.interval.ms etc), I am yet unhappy that the consumer won't process any more messages until the application gets restarted. Even worse, it seems after a certain period the consumer group itself gets deleted by Kafka, so when the application is restarted, it has lost its offset.

One could of course implement monitoring to identify lagging or lost consumer groups, but I'd rather like a proactive, self-healing implementation than relying on an alarm when bad things already happened.

  1. Is there a way I can make the consumer reconnect to the server automatically without restarting the whole application? Am I missing some config option?

  2. I tried to implement a Spring health check, so the application will report "DOWN" if the consumer is kicked from the group (so at least Kubernetes could do the restart automatically), but I could not detect anything on the client side. I tried to inject the KafkaListenerEndpointRegistry and iterated the listenerContainers. Though one of them got kicked out of the group, all containers were reporting "running=true" and did not provide any flag or property or method that would indicate the error. Is there any way to detect a disconnected consumer from the spring Kafka client side?


Solution

  • For you first Question:

    Is there a way I can make the consumer reconnect to the server automatically without restarting the whole application? Am I missing some config option?

    Static Membership option introduced in kafka brokers / clients (v2.3) would mitigate this behavior, all you need to do as a configs are as below "from the docs link about"

    Basically, what Static membership feature does is, whenever a consumer thread joins a group it will acquire a group.instance.id, so in case of this consumer instance (thread) left the group for any reason, this will not trigger a rebalance on the entire consumer group, yet, the partitions attached to this (dead) consumer thread will be idle and no processing is taking place until this consumer.instance.id get back to life again.

    • Upgrade both broker cluster and client apps to 2.3 or beyond, and also make sure the upgraded brokers are using inter.broker.protocol.version of 2.3 or beyond as well.
    • Set the config ConsumerConfig#GROUP_INSTANCE_ID_CONFIG to a unique value for each consumer instance under one group.
    • For Kafka Streams applications, it is sufficient to set a unique ConsumerConfig#GROUP_INSTANCE_ID_CONFIG per KafkaStreams instance, independent of the number of used threads for an instance.

    I recommend reading this article to get more details about Static vs Dynamic membership

    For you second question:

    I tried to implement a Spring health check, so the application will report

    Yes, If you are using Kafka-Streams API, you can register a StateListener as the below sample

                streams.setStateListener((KafkaStreams.State newState, KafkaStreams.State old)->{
                    logger.info()
                            .message("Thread state has changed from {" + old.name() + "} to {" + newState.name() + "}")
                            .log();
                    // Here you can set any global state that you can expose via api and enquire on it using K8S health check.
                    }
                });