Search code examples
pythonapache-kafkaconsumerconfluent-kafka-python

How to check number of partitions in Kafka(confluent_kafka)


I am trying to do batch etl by using confluent_kafka Python Packages at 0 0 * * * everday. I know that there are 4 partitions in my stream but it could be changed so Is there any way to check total number of partitions in specific topic? My consumer like that;

from confluent_kafka import Consumer, KafkaError

    messages = list()
    partition_counter = 0
    tnof_partition = 4

    while True:
        msg = self.consumer.poll(0.1)
        if msg is None:
            continue
        elif not msg.error():
            event = json.loads(msg.value().decode('utf-8'))

        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print("End of partition reached {0}/{1}"
                .format(msg.topic(), msg.partition()))
            
            partition_counter += 1
            if(partition_counter == tnof_partition):
                self.consumer.commit()
                self.consumer.close()
                break

Also I would appreciate it, if you could show alternative ways to implement batch consumer. Thanks


Solution

  • Consumer's list_topics() method can provide map of Topics consisting TopicMetadata which eventually has partitions in it.

    Ref: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.list_topics