Due to pykafka EOL we are in the process of migration to confluent-kafka-python. For pykafka
we wrote an elaborated script that produced output in the format:
topic | consumer group | offset |
---|---|---|
topic_alpha | total_messages | 100 |
topic_alpha | consumer_a | 10 |
topic_alpha | consumer_b | 25 |
I am wondering whether there is a Python code that knows how to do something similar for the confluent-kafka-python
?
small print: there is a partial example on how to read offsets per given consumer_group. However, I struggle to get the list of consumer_group
per topic without manually parsing __consumer_offsets
.
Use admin_client.list_groups()
to get a list of groups, and admin_client.list_topics()
to get all topics and partitions in the cluster and client.get_watermark_offsets()
for the given topics.
Then for each consumer group instantiate a new consumer with the corresponding group.id
, create a TopicPartition list to query committed offsets for and then call c.committed()
to retrieve the committed offsets.
Subtract the committed offsets from the high watermark to get th