Search code examples
pykafkaconfluent-kafka-python

confluent-kafka-python library: read offset per topic per consumer_group


Due to pykafka EOL we are in the process of migration to confluent-kafka-python. For pykafka we wrote an elaborated script that produced output in the format:

topic consumer group offset
topic_alpha total_messages 100
topic_alpha consumer_a 10
topic_alpha consumer_b 25

I am wondering whether there is a Python code that knows how to do something similar for the confluent-kafka-python?

small print: there is a partial example on how to read offsets per given consumer_group. However, I struggle to get the list of consumer_group per topic without manually parsing __consumer_offsets.


Solution

  • Use admin_client.list_groups() to get a list of groups, and admin_client.list_topics() to get all topics and partitions in the cluster and client.get_watermark_offsets() for the given topics.

    Then for each consumer group instantiate a new consumer with the corresponding group.id, create a TopicPartition list to query committed offsets for and then call c.committed() to retrieve the committed offsets. Subtract the committed offsets from the high watermark to get th