Search code examples
c#multithreadingconcurrencytask-parallel-libraryblockingcollection

Messages lost when consuming from BlockingCollection in batches


I've tried to invent a method to consume a batch from BlockingCollection and got in trouble.

Here's a minimal repro:

internal class Program
{
    private static readonly BlockingCollection<string> _bc = new BlockingCollection<string>(1000);
    private static int _consumed;

    static void Main()
    {
        Task.Run(() => Producer());
        Task.Run(() => Consumer());
        Console.WriteLine("press [ENTER] to check");
        while (true)
        {
            Console.ReadLine();
            Console.WriteLine("consumed: " + _consumed);
        }
    }

    private static void Producer()
    {
        for (var i = 0; i < 5000; i++)
            _bc.Add("msg");
    }

    private static void Consumer()
    {
        foreach (var s in _bc.GetConsumingEnumerable())
        {
            var batchSize = _bc.Count + 1;
            var batch = new List<string>(batchSize) { s };
            while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)
                batch.Add(additionalResult);
            _consumed = _consumed + batch.Count;
        }
    }
}

Few messages are lost (but not always the same number). If you can't reproduce it, try increasing the number of produced messages.

What I'm trying to achieve is to use the GetConsumingEnumerable method in the consumer (after some time I will call the CompleteAdding) and being able to collect a batch of messages of some size, if they're already present.

What's the reason of losing messages and how to use it right?


Solution

  • Wow. It's a bug. This line

    while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)
    

    should be

    while (batch.Count < batchSize && _bc.TryTake(out var additionalResult))
    

    since first condition has a side effect of removing item from the collection.