I've tried to invent a method to consume a batch from BlockingCollection
and got in trouble.
Here's a minimal repro:
internal class Program
{
private static readonly BlockingCollection<string> _bc = new BlockingCollection<string>(1000);
private static int _consumed;
static void Main()
{
Task.Run(() => Producer());
Task.Run(() => Consumer());
Console.WriteLine("press [ENTER] to check");
while (true)
{
Console.ReadLine();
Console.WriteLine("consumed: " + _consumed);
}
}
private static void Producer()
{
for (var i = 0; i < 5000; i++)
_bc.Add("msg");
}
private static void Consumer()
{
foreach (var s in _bc.GetConsumingEnumerable())
{
var batchSize = _bc.Count + 1;
var batch = new List<string>(batchSize) { s };
while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)
batch.Add(additionalResult);
_consumed = _consumed + batch.Count;
}
}
}
Few messages are lost (but not always the same number). If you can't reproduce it, try increasing the number of produced messages.
What I'm trying to achieve is to use the GetConsumingEnumerable
method in the consumer (after some time I will call the CompleteAdding
) and being able to collect a batch of messages of some size, if they're already present.
What's the reason of losing messages and how to use it right?
Wow. It's a bug. This line
while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)
should be
while (batch.Count < batchSize && _bc.TryTake(out var additionalResult))
since first condition has a side effect of removing item from the collection.