Search code examples
c#apache-kafkakafka-consumer-apiconfluent-kafka-dotnet

How can I consume just the last (newest) message from a Kafka topic with Kafka Confluent Consumer in C#?


I have following code for reading data from a Kafka topic. My target is periodically to read just the last newest message in the topic because I want use the data in a live chart. I have written the following code. But if I run the code I start reading somewhere from the past (24 hours ago). I think I have to define something like an offset in my code? How can I do that in a Kafka Confluent consumer?

public void Read_from_Kafka()
{
try
{
    var  config = new ConsumerConfig
    {
        BootstrapServers = kafka_URI,
        GroupId = "group",
        AutoOffsetReset = AutoOffsetReset.Earliest,
        SecurityProtocol = SecurityProtocol.Ssl,
        SslCaLocation = "path1",
        SslCertificateLocation = "path2",
        SslKeyLocation = "path3",
        SslKeyPassword = "password",                  

    };

    CancellationTokenSource source = new CancellationTokenSource();
    CancellationToken cancellationToken = source.Token;

    using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
    {
        consumer.Subscribe(topic_name);
        while (!cancellationToken.IsCancellationRequested)                    
        {
            var consumeResult = consumer.Consume(cancellationToken);
            Kafka_message_total = consumeResult.Message.Value;

            using (StreamWriter sw = File.AppendText(json_log_file))
            {
                sw.WriteLine("JSON: " + Kafka_message_total + " " + Convert.ToString(DateTime.Now));
            }                        
            System.Threading.Thread.Sleep(2000);
        }
        consumer.Close();
    }
    using (StreamWriter sw = File.AppendText(error_log))
    {
        sw.WriteLine("Stop Kafka " + " " + Convert.ToString(DateTime.Now));
    }
}

catch(Exception ex)
{

    using (StreamWriter sw = File.AppendText(error_log))
    {
        sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
    }
}

}

Update-1

I have tried to set AutoOffsetReset = AutoOffsetReset.Latest but still I am reading data from the past. I think this setting is not enough for my purpose.


Solution

  • I don't think the Seek method works very well in the .NET confluent-kafka-dotnet package, and it's suggested to use Assign with a TopicPartitionOffset instead.

    The code below finds the last message on topic 'purchases' if you only have one partition, partition 0:

    using Confluent.Kafka;
    using System;
    
    class Consumer
    {
        static void Main(string[] args)
        {
            ConsumerConfig config = new()
            {
                BootstrapServers = "localhost:9092",
                GroupId = Guid.NewGuid().ToString(),
            };
            using IConsumer<string, string> consumer 
                = new ConsumerBuilder<string, string>(config).Build();
            try
            {
                TopicPartition topicPartition = new("purchases", new Partition(0));
                WatermarkOffsets watermarkOffsets 
                    = consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(3));
                TopicPartitionOffset topicPartitionOffset 
                    = new(topicPartition, new Offset(watermarkOffsets.High.Value - 1));
                consumer.Assign(topicPartitionOffset);
                ConsumeResult<string, string> consumeResult 
                    = consumer.Consume(TimeSpan.FromSeconds(3));
                Console.Write($"Last message value = {consumeResult.Message.Value}, " +
                    $"position = {consumer.Position(topicPartition)}");
            }
            finally { consumer.Close(); }
        }
    }
    

    I've used timeouts rather than cancellation tokens here obviously, but using cancellation tokens shouldn't affect the logic.

    There are some warnings that in certain circumstances Kafka doesn't use offsets in such a simple way as this, and simply taking one off the high watermark offset for the partition may not always work. In the examples I've tested it's worked fine though.

    If you have multiple partitions the code can be extended to find the last message in each partition by iterating and changing new Partition(0) to the appropriate index, for example new Partition(1) for partition 1. With multiple partitions I don't think Kafka itself knows which is the last written message. Order is not guaranteed except within a partition. However, once you have the last message in each partition you may have some message property you can use to work this out (increasing ID, timestamp).