Search code examples
apache-kafkakafka-consumer-apiconfluent-kafka-python

The last messages are not consumed from a kafka topic even though they should be, leaving a constant consumer lag


I have a very basic kafka consumer which needs to consume data from a 32-partitions topic with a large amount of data on each partition.

It manages to consume most data from that topic, but once we get towards the end of each partition, it does not quite reach the end of it and always keeps a small lag instead of reaching the latest offset for that partition.

Every time I restart my consumer, it consumes from a few of those partitions, reducing the lag to 0, but not all of them.

Here is the smallest consuming code that reproduces this error:

from confluent_kafka import Consumer

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
    "group.id": "group-id",
})

consumer.subscribe(["topic"])

while True:
    batch = consumer.consume(timeout=1, num_messages=100)
    if batch:
        consumer.commit(batch[-1])

Solution

  • After trying to explicitly set min.fetch.bytes to 1 to make sure my broker was not holding data, and trying to refactor my original code, I noticed that I was only committing the last message received in the batch: and for some reason I subconsciously assumed that all messages in one batch came from the same partition, but I was wrong!

    Making sure to commit offsets for all partitions responsible for at least one message in the batch fixed my issue:

    partitions_to_commit = {m.partition(): m for m in batch}
    for message in partitions_to_commit.values():
        consumer.commit(message)