Search code examples
c#multithreadingconcurrency

How to implement a finite concurrent work queue that allows workers to re-enqueue work


I'm looking to implement the following:

  • A work queue which I will pre-populate with some initial work items
  • Once done, I need to launch several worker tasks (or threads), which will dequeue work from this queue
  • While processing a work item, each worker is allowed to re-enqueue items into the queue
  • All worker threads must remain running and attempt to dequeue new work items, until A) there is no work currently left to do (the queue is empty), and B) it is certain that no other worker will enqueue any further work (aka, is currently processing an item)

What is the simplest way to implement this, preferably using largely standard data structures in C#?


My initial approach was something like this:

var queue = new ConcurrentQueue<WorkItem>();
// ... populate queue with some initial items ...

await Task.WhenAll(Enumerable.Range(0, nWorkers).Select(_ => Task.Run(async () => {
  while(queue.TryDequeue(out var item))
  {
     // Process stuff asynchronously, potentially re-queue work, etc.
  }
}).ToArray());

But of course, this does not work - it's possible a worker may terminate because TryDequeue fails(), but another still active worker will re-queue work.

A channel seems quite suitable as well, workers could dequeue until the channel is closed. But the fundamental problem remains: How exactly can I determine the closing condition? Which worker closes the channel, and based on which condition?


Solution

  • You'll need either a BlockingCollection<T> or a Channel<T> as a queue, depending in whether you want your workers to be synchronous or asynchronous. You'll also need an int variable to keep track of the pending work items. This variable must be incremented before adding an item in the queue, and decremented when a work item is done (in a finally section). Incrementing and decrementing must be atomic (Interlocked). When the variable is decremented to zero, you know that all work is done, and you can call either the BlockingCollection<T>.CompleteAdding or the Channel<T>.Writer.Complete. You can find a usage example of this technique in this answer (the ParallelTraverseHierarchical method).