Search code examples
pythonapache-kafkakafka-consumer-apirebalancing

python Apache Kafka CommitFailedError due to group rebalanced


I have several kafka consumers receive message from the same topic. The message indicates which consumer is corresponding for processing that message. They all commit the message once they receive the message, and only process the message if the id in the message is same as its own hostname. Note that the time required for the consumer to process the message is very long, around 1 hour, as it creates subprocess to run other scripts. The frequency of messages sent from producer varies, but normally not exceed 20 a day.

Below is the script of consumer:

from kafka import KafkaConsumer

context = ssl.create_default_context()
hostname = socket.gethostname()

consumer = KafkaConsumer(
    group_id="group_id",
    bootstrap_servers="localhost:8000",
    security_protocol="SSL",
    auto_offset_reset="earliest"
)
consumer.subscribe(["my-topic"])

for message in consumer:

    consumer.commit()
    _message = json.loads(message.value.decode("UTF-8"))    

    if _message["hostname"] == hostname:
        # Run the script which takes about 1 hour
        subprocess.run(["unreal_engine_process.bat"], capture_output=True, text=True)
        ...

The second commit of the consumer sometimes return error:

kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.

Would like to know:

  • is there any problem with my consumer code? or is the problem of kafka server config?
  • is it ok to commit at the beginning if i don't have to ensure successful processing of the message? Does the problem cause by the duration between commit time and the message process time? or is it related to the consumer send heart beat issue?
  • is such processing time (1 hour) too long for kafka?
  • increasing max_poll_interval_ms works? and is it suitable to raise it to several hours?
  • or any other comments.
  • max_poll_interval_ms max_poll_records.

Many thanks!

Tried committing the message at the beginning of receive, or a bit later. But the problem is still there. And sometimes a consumed and committed message is again consumed and processed by the consumer. Suspecting is about message offset and commit problem too.


Solution

  • 1.Neither.

    2.It is ok to commit at the beginning if i don't have to ensure successful processing of the message. The problem is caused by too long poll interval ( > max_poll_interval_ms). it is a kind of heart beat issue.

    3.Yes, it's too long.

    4.Increasing max_poll_interval_ms works, but it's not a good idea.

    That means consumer maximum time will be idle before fetching more records.If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance by calling poll in order to reassign the partitions to another consumer instance. If we are doing long batch processing its good to increase max.poll.interval.ms but please note increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll.

    In your case, you can do your job asynchronously by create a new thread to do it and continue to poll as soon as possible.

    If your job must be done orderly, increase max_poll_interval_ms to hours. In this situation, once your consumer is down, the partition will not be reassigned to other consumers. You should monitor your consumers to prevent partition starvation.