I'm looking to implement the following:
What is the simplest way to implement this, preferably using largely standard data structures in C#?
My initial approach was something like this:
var queue = new ConcurrentQueue<WorkItem>();
// ... populate queue with some initial items ...
await Task.WhenAll(Enumerable.Range(0, nWorkers).Select(_ => Task.Run(async () => {
while(queue.TryDequeue(out var item))
{
// Process stuff asynchronously, potentially re-queue work, etc.
}
}).ToArray());
But of course, this does not work - it's possible a worker may terminate because TryDequeue fails(), but another still active worker will re-queue work.
A channel seems quite suitable as well, workers could dequeue until the channel is closed. But the fundamental problem remains: How exactly can I determine the closing condition? Which worker closes the channel, and based on which condition?
You'll need either a BlockingCollection<T>
or a Channel<T>
as a queue, depending in whether you want your workers to be synchronous or asynchronous. You'll also need an int
variable to keep track of the pending work items. This variable must be incremented before adding an item in the queue, and decremented when a work item is done (in a finally
section). Incrementing and decrementing must be atomic (Interlocked
). When the variable is decremented to zero, you know that all work is done, and you can call either the BlockingCollection<T>.CompleteAdding
or the Channel<T>.Writer.Complete
. You can find a usage example of this technique in this answer (the ParallelTraverseHierarchical
method).