Search code examples
c#multithreading.net-coreconcurrencyproducer-consumer

How to process remaining data in the BlockingCollection<> when CompleteAdding() has been called


BlockingCollection<> ignores to process remaining data in the underlying queue when receives a call from CompleteAdding() method. The following code, is part of the consumer's method:

while (blockingCollection.TryTake(out item Item))
{
    //Do something with item.
}

Instead, I'd like to change it in a way that it processes all the remaining items AFTER producer issues CompleteAdding() method, and ONLY THEN jumps out from the loop.


Solution

  • This is not the correct way to consume a BlockingCollection<T>:

    while (blockingCollection.TryTake(out item Item))
    {
        // Do something with item
    }
    

    The above loop exits at any moment the blockingCollection is temporarily out of items. The correct way is to enumerate the GetConsumingEnumerable():

    foreach (var item in blockingCollection.GetConsumingEnumerable())
    {
        // Do something with item
    }
    

    The above loop exits when the blockingCollection has been marked as CompleteAdding, and all available items have been processed. If at any moment the blockingCollection is temporarily empty, the loop blocks the current thread, waiting synchronously for either a new item or for the CompleteAdding() signal.

    A more specialized loop variant is presented below. In case of inactivity it logs a message every few seconds, so that you know that the consumer is not dead but just idle.

    while (true)
    {
        while (blockingCollection.TryTake(out item Item, TimeSpan.FromSeconds(10)))
        {
            // Do something with item
        }
        if (blockingCollection.IsCompleted) break;
    
        // Log a heartbeat
    }