Search code examples
.net.net-coreapache-kafkakafka-consumer-apiconfluent-platform

How to Consume from specific TopicPartitionOffset with Confluent.Kafka in .Net


I need my consumer to consume from an specific TopicPartitionOffset(here from offset 278). Suppose that Messages have been produced by some Producer in Specific topic like ="Test_1" before. Here is my Code

using System;
using Confluent.Kafka;

public class ConsumingTest
{
    public static void Main(string[] args)
    {
        var consumerConfig = new ConsumerConfig
                                 {
                                     BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
                                 };

        using (var consumer = new Consumer<Null, string>(consumerConfig))
        {
            Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
            consumer.Subscribe("Test_1");

            var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));

            consumer.Assign(topicPartitionOffset);
            consumer.Seek(topicPartitionOffset);

            while (true)
                try
                {
                    var cr = consumer.Consume();

                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine(e.Message);
                }
        }
    }
}

at line ----> var cr = consumer.Consume(); Consumer Consumes but nothing Happens. What is the problem.

I have already done AutoOffsetReset = AutoOffsetResetType.Earliest in ConsumerConfig , and Consumer Consumes All messages from all offsets but, this is not what I'm looking for.


Solution

  • Solved: I found the solution which described as below:

    • added this

    consumer.Assign(new TopicPartitionOffset(topicName, 0, new Offset(lastConsumedOffset))) Before trying to Consume, and

    • Removed these

    consumer.Subscribe("Test_1") and consumer.Seek(...)

    So Updated code is something like this which perfectly works:

    using (var consumer = new Consumer<Ignore, string>(config))
                {
                    consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
                    while (true)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume();
                            Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Consume error: {e.Error}");
                        }
                    }
    
                    consumer.Close();
                }