BlockingCollection<>
ignores to process remaining data in the underlying queue when receives a call from CompleteAdding()
method. The following code, is part of the consumer's method:
while (blockingCollection.TryTake(out item Item))
{
//Do something with item.
}
Instead, I'd like to change it in a way that it processes all the remaining items AFTER producer issues CompleteAdding()
method, and ONLY THEN jumps out from the loop.
This is not the correct way to consume a BlockingCollection<T>
:
while (blockingCollection.TryTake(out item Item))
{
// Do something with item
}
The above loop exits at any moment the blockingCollection
is temporarily out of items. The correct way is to enumerate the GetConsumingEnumerable()
:
foreach (var item in blockingCollection.GetConsumingEnumerable())
{
// Do something with item
}
The above loop exits when the blockingCollection
has been marked as CompleteAdding
, and all available items have been processed. If at any moment the blockingCollection
is temporarily empty, the loop blocks the current thread, waiting synchronously for either a new item or for the CompleteAdding()
signal.
A more specialized loop variant is presented below. In case of inactivity it logs a message every few seconds, so that you know that the consumer is not dead but just idle.
while (true)
{
while (blockingCollection.TryTake(out item Item, TimeSpan.FromSeconds(10)))
{
// Do something with item
}
if (blockingCollection.IsCompleted) break;
// Log a heartbeat
}