Search code examples
c#.net-coreapache-kafkalibrdkafkaconfluent-kafka-dotnet

Kafka: Consume partition with manual batching - Messages are being skipped


I am using Confluent Kafka .NET to create a consumer for a partitioned topic.

As Confluent Kafka .NET does not support consuming in batches, I built a function that consumes messages until the batch size is reached. The idea of this function is to build batches with messages from the same partition only, that is why I stop building the batch once I consume a result that has a different partition and return whatever number of messages I was able to consume up to that point.

Goal or Objective: I want to be able to process the messages I returned in the batch, and commit the offsets for those messages only. i.e:

Message Consumed From Partition Offset Stored in Batch
0 0 Yes
0 1 Yes
2 0 No

From the table above I would like to process both messages I got from partition 0. Message from partition 2 would be ignored and (hopefully) PICKED UP LATER in another call to ConsumeBatch.

To commit I simply call the synchronous Commit function passing the offset of the latest message I processed as parameter. In this case I would pass the offset of the second message of the batch shown in the table above (Partition 0 - Offset 1).

ISSUE:

The problem is that for some reason, when I build a batch like the one shown above, the messages I decide not to process because of validations are being ignored forever. i.e: Message 0 of partition 2 will never be picked up by the consumer again.

As you can see in the consumer configuration below, I have set both EnableAutoCommit and EnableAutoOffsetStore as false. I think this would be enough for the consumer to not do anything with the offsets and be able to pick up ignored messages in another Consume call, but it isn't. The offset is somehow increasing up to the latest consumed message for each partition, regardless of my configuration.

Can anybody give me some light on what am I missing here to achieve the desired behavior if possible?

Simplified version of the function to build the batch:

public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
{
    List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();

    int latestPartition = -1; // The partition from where we consumed the last message

    for (int i = 0; i < batchSize; i++)
    {
        var result = _consumer.Consume(100);
        
        if (result != null)
        {
            if (latestPartition == -1 || result.Partition.Value == latestPartition)
            {
                consumedMessages.Add(result);
                latestPartition = result.Partition.Value;
            }
            else
                break;
        }
        else
            break;
    }

    return consumedMessages;
}

ConsumerConfig used to instantiate my consumer client:

_consumerConfig = new ConsumerConfig
        {
            BootstrapServers = _bootstrapServers,
            EnableAutoCommit = false,
            AutoCommitIntervalMs = 0,
            GroupId = "WorkerConsumers",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnableAutoOffsetStore = false,
        };

Additional Information: This is being tested with:

  • 1 topic with 6 partitions and replication factor of 2
  • 3 brokers
  • 1 single-threaded consumer client that belongs to a consumer group
  • Local environment with wsl2 on Windows 10

Solution

  • The key was to use the Seek function to reset the partition's offset to a specific position so that the ignored message could be picked up again as part of another batch.

    In the same function above:

    public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
    {
        List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();
    
        int latestPartition = -1; // The partition from where we consumed the last message
    
        for (int i = 0; i < batchSize; i++)
        {
            var result = _consumer.Consume(100);
        
            if (result != null)
            {
                if (latestPartition == -1 || result.Partition.Value == latestPartition)
                {
                    consumedMessages.Add(result);
                    latestPartition = result.Partition.Value;
                }
                else
                {
                    // This call will guarantee that this message that will not be included in the current batch, will be included in another batch later
                    _consumer.Seek(result.TopicPartitionOffset); // IMPORTANT LINE!!!!!!!
                    break;
                }
            }
            else
                break;
        }
    
        return consumedMessages;
    }
    

    I think in general, if you want to consume a message without altering the offsets in any way (kinda peeking the topic partition), you can call Consume and then use Seek(result.TopicPartitionOffset) to set the offset of that topic partition back to where it was before consuming the message.