Search code examples
.netapache-kafkaconfluent-platform

How can I list the Kafka consumer lag and latest offset per partition and consumer group using the Kafka .NET client API?


I'm trying to get the consumer lag using the .NET Confluent.Kafka 1.4.0-RC1 (for Net472). I can get the desired result using this script:

$ bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group Grp1 --describe

Resulting:

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
Grp1            test3           1          15              15              0               rdkafka-ca76855f-7b66-4bf1-82bc-73e9a1c1cf71 /10.186.129.93  rdkafka
Grp1            test3           2          13              13              0               rdkafka-d64379dc-881a-4f6f-a793-51e832cc2f5a /10.186.129.93  rdkafka
Grp1            test3           0          9               9               0               rdkafka-a25bdb80-3b70-4e42-963e-d41ad9e2a99a /10.186.129.93  rdkafka
Grp1            test            0          68              68              0               -                                            -               -

I'm not able to get similar report using the .NET client code. Here's the code I've been trying - but getting nothing as the consumer.Assignment property has an empty collection.

private string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName) {
    // kafka-console-consumer.bat --zookeeper MW45670117:2380 --topic powertelemetry --consumer-property group.id=test123 --consumer-property enable.auto.commit=true

    StringBuilder sb = new StringBuilder();
    sb.AppendLine("\n");
    sb.AppendLine("Consumer Group Lag Report");
    sb.AppendLine("-------------------------");
    ConsumerConfig config = new ConsumerConfig {
        BootstrapServers = bootstrapServers,
        GroupId = consumerGroupName,
        AutoOffsetReset = AutoOffsetReset.Earliest
    };

    using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) {
        foreach (TopicPartition tp in consumer.Assignment) {
            string topic = tp.Topic;
            int partitionID = tp.Partition.Value;
            // gets the current position (offset) for the specific topic/partition
            Offset offset = consumer.Position(new TopicPartition(topic, new Partition(partitionID)));
            sb.AppendLine($"Offset value is: {offset.Value}");

            // returns current commited offset for the current assignment
            List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(4));
            foreach (TopicPartitionOffset tpo in tpos) {
                sb.AppendLine($"Commited offset for partition {tpo.TopicPartition} is {tpo.Offset}");
            }
        }
    }

    return sb.ToString();
}

Looking for both consumer lag and latest offset per partition/consumer-group.


Solution

  • OK, I finally found how to do it. Looks like you need to set the topic partitions assignments to the consumer group. In my case the topic was test3 and I had 3 partitions to it. This had to be set in the code. I guess a more generic implementation would be to pass into this method the List associated with this group. Here is how I get the consumer lag:

    private string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName) {
        StringBuilder sb = new StringBuilder();
        sb.AppendLine("\n");
        sb.AppendLine("Consumer Group Lag Report");
        sb.AppendLine("-------------------------");
        ConsumerConfig config = new ConsumerConfig {
            BootstrapServers = bootstrapServers,
            GroupId = consumerGroupName,
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
    
        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) {
            List<TopicPartition> topic_partitions = new List<TopicPartition>() {
                { new TopicPartition("test3", new Partition(0)) },
                { new TopicPartition("test3", new Partition(1) ) },
                { new TopicPartition("test3", new Partition(2) ) } };
    
            consumer.Assign(topic_partitions);
    
            List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(40));
            foreach (TopicPartitionOffset tpo in tpos) {
                WatermarkOffsets w = consumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(40));
                long commited = tpo.Offset.Value;
                long log_end_offset = w.High.Value;
                long lag = log_end_offset - commited;
                sb.AppendLine($"Commited offset for Topic {tpo.TopicPartition.Topic} Partition {tpo.TopicPartition.Partition} is {commited} out of Wantermark end offset {log_end_offset} Lag is: {lag}");
            }
    
            sb.AppendLine();
        }
    
        return sb.ToString();
    }