I am trying to do batch etl by using confluent_kafka Python Packages at 0 0 * * * everday. I know that there are 4 partitions in my stream but it could be changed so Is there any way to check total number of partitions in specific topic? My consumer like that;
from confluent_kafka import Consumer, KafkaError
messages = list()
partition_counter = 0
tnof_partition = 4
while True:
msg = self.consumer.poll(0.1)
if msg is None:
continue
elif not msg.error():
event = json.loads(msg.value().decode('utf-8'))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print("End of partition reached {0}/{1}"
.format(msg.topic(), msg.partition()))
partition_counter += 1
if(partition_counter == tnof_partition):
self.consumer.commit()
self.consumer.close()
break
Also I would appreciate it, if you could show alternative ways to implement batch consumer. Thanks
Consumer's list_topics()
method can provide map of Topics
consisting TopicMetadata
which eventually has partitions
in it.