Search code examples
apache-kafkakafka-consumer-apilibrdkafka

Return from Kafka consumer when there is no message


I want to process a topic in application startup using Confluent dotnet client. Assume following example:

    while (true)
    {
        try
        {
            var cr = c.Consume();
            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Error occured: {e.Error.Reason}");
        }
    }

When there is no new message in Kafka, c.Consume will be blocked. Because I want to use it for application startup (Like cache warm up) I want to proceed my code when I found there is no new message.

I know there is an overload for setting timeout like c.Consume(timeout) but the problem with this approach is that if you have a message in your topic and the time duration of reading the message was more than your timeout, You receive null output which is not desirable.


Solution

  • The consumer(s) is not supposed to be aware of the producer(s).

    Now if you want to know that you have read everything in the topic from the moment you start to consume, you can:

    1. Load the newest offset before starting to consume.
    2. Then start consuming messages.
    3. If the message's offset is the same as the newest offset you loaded before, stop consuming.

    I'm not a C# developper but from what I read in the dotnet confluent doc you can call QueryWatermarkOffsetson the consumer to get oldest and newest offset. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html#Confluent_Kafka_Consumer_QueryWatermarkOffsets_Confluent_Kafka_TopicPartition_

    And then, on the Messageclass you have an Offset accessor. So the whole thing should not be too hard to achieve. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html#Confluent_Kafka_Message_Offset