Search code examples
pythonapache-kafkakafka-consumer-apipykafka

Kafka Consumers on different partitions under same group are still consuming same messages intermittently


I have 1 consumer group and 5 consumers. There are 5 partitions too hence each consumer gets 1 partition.

CLI also shows that

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic Topic-1
Topic: Topic-1  TopicId: kJqfk1FoRSWtkkjfsgw9FSg    PartitionCount: 5   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: Topic-1  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 4    Leader: 0   Replicas: 0 Isr: 0

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic-1 --from-beginning --partition {n} correctly shows different messages for each partition.

However, I see a lot of times that 2 or more consumers work on the same message and being new to kafka, im not really able to figure out the problem.

I am using pykafka to consume messages:

class CB_Kafka_Consumer:
    def __init__(self):
        self._connect_kafka_consumer()
        module_logger.info(f"Kafka Consumer at {kafka_broker_url}:{kafka_broker_port}")
        ''' Get DB session object '''
        self.session = get_db_session(SQL_USERNAME, SQL_PASSWORD, SQL_SERVER_IP, SQL_DATABASE)
        module_logger.info(f"Connected to MySQL at {SQL_SERVER_IP}")

    def _connect_kafka_consumer(self):
        self._consumer = None
        try:
            self._client = KafkaClient(f"{kafka_broker_url}:{kafka_broker_port}")
            topic = self._client.topics[kafka_topic]
            self._consumer = topic.get_simple_consumer(consumer_group=CONSUMER_GROUP_NAME)

            module_logger.info("Created a Kafka Consumer")
        except Exception as ex:
            module_logger.error('Exception while connecting Kafka')
            traceback.print_exc()

    def start_consuming(self):
        module_logger.info("*"*10 + " Staring to Consume Messages " + "*"*10)
        while True:
            for msg in self._consumer:
                self._consumer.commit_offsets()
                message = json.loads(msg.value.decode('utf-8'))
                module_logger.debug(f"\n----- RECEIVED MESSAGE ----------")
                module_logger.debug(pprint.pformat(message))
                self.process_message(message) #Logic for processing messages (takes anywhere between 10min to 4 hours for the task to complete)
        self._consumer.close()

Solution

  • Print the partition and offset of the messages. You should see they are, in fact, unique events you're processing.

    If those are the same, the "10min to 4hr" process is very likely causing a consumer group rebalance (Kafka requires you to invoke a record poll every few milliseconds, by default), and you're experiencing at-least-once processing semantics, and therefore need to handle duplicates on your own.

    I see you're using some database client in your code, and so the recommendation would be to use Kafka Connect framework, rather than writing your own Consumer