We are facing an issue with a set of kafka-consumers. Whenever there is activity on the kafka-cluster like rebooting the brokers(rolling restarts) or reboot of VM's running the broker, our kafka consumers LeaveGroup
after failing to heartbeat. The below logs repeat exactly for one minute and correspond to the commitSync
call being done in the application code as a part of consuming messages from the topic
[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] Offset commit failed on partition <topic-name> at offset 455700: The coordinator is loading and hence can't process requests.
This interval corresponds to the default time for which retries are done in the Kafka consumer client commitSync
java API.
Post this there are no logs for the next 5 minutes.
Thereafter I see the following
[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] Attempt to heartbeat failed since group is rebalancing
[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] Member consumer-13-837563e4-49e9-4bd1-aee4-cb21263e176a sending LeaveGroup request to coordinator <broker-host-name> (id: 2147483646 rack: null)
[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
After this the messages pile up in the topic and we notice the lag obviously increasing with no consumers. We restart the app hosting the consumers to start consumption again.
What can we do to avoid this? Should anything be done on the application consumer side to handle this?
Note : For this particular consumer we have used the apache kafka client library. We normally use the spring-kafka
library to build our consumers. We used the apache one as we wanted to use pause
and resume
functions of the kafka consumers which are not supported in the version of the spring-kafka
we use
Finally root-caused the issue. The kafkaConsumer#commitSync
was throwing an unchecked exception,TimeOutException
, as the new group coordinator had not finished loading offsets in the one minute that the commitSync
does retries for when faced with an error.
I hadn't handled this transient error. What made this difficult to debug was that I had spawned my consumer of a thread from the Main thread. There was no exception handling in the consumer thread nor was I examining the future object in the main thread. As a result the TimeOutException
was not being logged also.