Search code examples
c#parallel-processingtask-parallel-libraryblockingcollection

Graceful exit from BlockingCollection.TakeFromAny when adding to collections completed


I wait for available items in two BlockingCollections in a loop using BlockingCollection.TakeFromAny. Items are added to the collections in other theads. At some point I finish adding to the collections and on the next call to TakeFromAny my code gets stuck and does not return from it.

I tried to use CompleteAdding for the two collections or use TakeFromAny with a CancellationToken parameter but in both cases an exception occurs:

  • with CompleteAdding there is an ArgumentException with the message

All collections are marked as complete with regards to additions. Parameter name: collections

  • with CancellationToken there is an OperationCanceledException obviously.

Is it possible to set collections in some way that my code gets out from TakeFromAny without an exception and with a return value that would indicate that there won't be any new items in the underlying collections?

TryTakeFromAny is not suitable for my needs as it always returns when it happens that the collections are currently empty but items still will be added later. I want to block until the next item is available or there is nothing to add.


Solution

  • TryTakeFromAny is not suitable for my needs as it always returns when it happens that the collections are currently empty but items still will be added later. I want to block until the next item is available or there is nothing to add.

    The TryTakeFromAny() method has overloads that allow you to specify a timeout value. The method will not return until either there's a value to be returned, or the timeout period has been exceeded.

    For these overloads, if you specify a timeout of -1 milliseconds, the method will wait indefinitely. It will not throw an exception if either collection has had its CompleteAdding() method called, and it will return -1 as the collection index value if all of the collections passed to it have had their CompleteAdding() method called.

    A simple example that shows this working:

    static void Main(string[] args)
    {
        BlockingCollection<int> c1 = new BlockingCollection<int>(), c2 = new BlockingCollection<int>();
    
        Task task = Produce(c1, c2);
    
        while (BlockingCollection<int>.TryTakeFromAny(new[] { c1, c2 }, out int value, -1) >= 0)
        {
            Console.WriteLine($"value: {value}");
        }
    
        Console.WriteLine($"task.IsCompleted: {task.IsCompleted}");
        task.Wait();
    }
    
    private static async Task Produce(BlockingCollection<int> c1, BlockingCollection<int> c2)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));
        c1.Add(1);
        await Task.Delay(TimeSpan.FromSeconds(1));
        c1.CompleteAdding();
        await Task.Delay(TimeSpan.FromSeconds(1));
        c2.Add(2);
        await Task.Delay(TimeSpan.FromSeconds(1));
        c2.CompleteAdding();
    }