I have got an issue with race conditions. They are outlined in the code example where I write comments // POSSIBLE RACE
. This design is something that I came up with myself, but it's got race issues and I am not sure how to overcome them. Perhaps using semaphores is the wrong choice.
Scenario: A producer should produce jobs while there are jobs in DB queue AND consumers are still processing jobs. If consumers have finished processing jobs, producer should release all consumers and the producer and consumers should exit.
How do I solve the issue below such that I can have a pool of consumers and one producer, where producer signals to consumers when to check queue for more items if they have run out?
Should I be using a different pattern? Should I be using Semaphore, Mutex, or some other kind of locking mechanism?
Thank you for your help! I have been trying to solve this issue for quite some time.
Fiddle: https://dotnetfiddle.net/Widget/SeNqQx
public class Producer
{
readonly int processorCount = Environment.ProcessorCount;
readonly List<Consumer> consumers = new List<Consumer>();
ConcurrentQueue<Job> jobs;
readonly object queueLock = new object();
readonly Semaphore producerSemaphore;
readonly Semaphore consumerSemaphore;
public Producer()
{
producerSemaphore = new Semaphore(1, 1);
consumerSemaphore = new Semaphore(processorCount, processorCount);
}
public void StartTask()
{
jobs = GetJobs();
using (var resetEvent = new ManualResetEvent(false))
{
for (var i = 0; i < processorCount; i++)
{
var consumer = new Consumer(jobs, queueLock, producerSemaphore, consumerSemaphore);
consumers.Add(consumer);
QueueConsumer(consumer, processorCount, resetEvent);
}
AddJobsToQueueWhenAvailable(resetEvent);
resetEvent.WaitOne(); // waits for QueueConsumer(..) to finish
}
}
private ConcurrentQueue<Job> GetJobs(){
var q = new ConcurrentQueue<Job>();
for (var i = 0; i < 5; i++) q.Enqueue(new Job()); // this usually comes from DB queue
return q;
}
private void QueueConsumer(Consumer consumer, int numberOfThreadsRunning, ManualResetEvent resetEvent)
{
ThreadPool.QueueUserWorkItem(_ =>
{
try
{
consumer.StartJob();
}
catch (Exception ex)
{
Console.WriteLine("Exception occurred " + ex);
}
finally
{
// Safely decrement the counter
if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
{
resetEvent.Set();
}
}
});
}
private void AddJobsToQueueWhenAvailable(ManualResetEvent resetEvent)
{
ThreadPool.QueueUserWorkItem(_ =>
{
while (true) // TODO - replace with cancellation token
{
// lock queue - so that no workers will steal another workers item
lock (queueLock)
{
// check that at least 1 worker is still active
if (consumers.TrueForAll(w => !w.IsRunning))
{
// all jobs complete - release all locks if 0 workers active
consumerSemaphore.Release(processorCount);
return;
}
// poll for new items that have been added to the queue
var newJobs = GetJobs();
// for each item:
foreach (var job in newJobs)
{
// add item to queue
jobs.Enqueue(job);
// If we have any workers halted, let them know there are new items!
if (consumers.Any(w => !w.IsRunning))
{
// POSSIBLE RACE - Consumer may set IsRunning=false, but haven't called wait yet!
// signal worker to continue via semaphore
consumerSemaphore.Release(1);
// wait until worker thread wakes up and takes item before unlocking queue
producerSemaphore.WaitOne();
}
}
} // unlock queue
// sleep for a bit
Thread.Sleep(500); // TODO - replace with cancellation token
}
});
}
}
public class Consumer
{
public bool IsRunning;
ConcurrentQueue<Job> jobs;
private object queueLock;
private Semaphore producerSemaphore;
private Semaphore consumerSemaphore;
public Consumer(ConcurrentQueue<Job> jobs, object queueLock, Semaphore producerSemaphore, Semaphore consumerSemaphore)
{
this.jobs = jobs;
this.queueLock = queueLock;
this.producerSemaphore = producerSemaphore;
this.consumerSemaphore = consumerSemaphore;
}
public void StartJob() {
while(TryGetNextJob(out var job)) {
// do stuff with job
}
}
private bool TryGetNextJob(out Job nextJob)
{
// lock to prevent producer from producing items before we've had a chance to wait
lock (queueLock)
{
if (jobs.TryDequeue(out nextJob))
return true; // we have an item - let's process it
// worker halted
IsRunning = false;
}
// wait for signal from producer
consumerSemaphore.WaitOne();
// once received signal, there should be a new item in the queue - if there is not item, it means all children are finished
var itemDequeued = jobs.TryDequeue(out nextJob);
if (!itemDequeued)
{
return false; // looks like it's time to exit
}
// another item for us to process
IsRunning = true;
// let producer know it's safe to release queueLock
producerSemaphore.Release(); // POSSIBLE RACE - producer may not have locked yet! (WaitOne)
return true;
}
}
public class Job { }
Thanks for the help. I will certainly look into BlockingCollection.
So I actually wasn't far off what I wanted. I just needed to read a bit more on Semaphores (initialise with correct initial count) for the code to work correctly, as well as a few other bits and pieces. Search for EDIT
to see what has changed. Working solution:
public class Producer
{
readonly int processorCount = Environment.ProcessorCount;
readonly List<Consumer> consumers = new List<Consumer>();
ConcurrentQueue<Job> jobs;
readonly object queueLock = new object();
readonly Semaphore producerSemaphore;
readonly Semaphore consumerSemaphore;
int numberOfThreadsRunning;
public Producer()
{
producerSemaphore = new Semaphore(0, 1); // EDIT - MUST START WITH 0 INITIALLY
consumerSemaphore = new Semaphore(0, processorCount); // EDIT - MUST START WITH 0 INITIALLY
numberOfThreadsRunning = processorCount; // EDIT - take copy so that Interlocked.Decrement references the same int variable in memory
}
public void StartTask()
{
jobs = GetJobs();
using (var resetEvent = new ManualResetEvent(false))
{
for (var i = 0; i < processorCount; i++)
{
var consumer = new Consumer(jobs, queueLock, producerSemaphore, consumerSemaphore);
consumers.Add(consumer);
QueueConsumer(consumer, resetEvent);
}
AddJobsToQueueWhenAvailable(resetEvent);
resetEvent.WaitOne(); // waits for QueueConsumer(..) to finish
}
}
private ConcurrentQueue<Job> GetJobs(){
var q = new ConcurrentQueue<Job>();
for (var i = 0; i < 5; i++) q.Enqueue(new Job()); // this usually comes from DB queue
return q;
}
private void QueueConsumer(Consumer consumer, ManualResetEvent resetEvent)
{
ThreadPool.QueueUserWorkItem(_ =>
{
try
{
consumer.StartJob();
}
catch (Exception ex)
{
lock (queueLock)
{
consumers.Remove(worker);
}
Console.WriteLine("Exception occurred " + ex);
}
finally
{
// Safely decrement the counter
if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
{
resetEvent.Set();
}
}
});
}
private void AddJobsToQueueWhenAvailable(ManualResetEvent resetEvent)
{
ThreadPool.QueueUserWorkItem(_ =>
{
while (true) // TODO - replace with cancellation token
{
// lock queue - so that no workers will steal another workers item
lock (queueLock)
{
// check that at least 1 worker is still active
if (consumers.TrueForAll(w => !w.IsRunning))
{
// all jobs complete - release all locks if 0 workers active
consumerSemaphore.Release(processorCount);
return;
}
// poll for new items that have been added to the queue
var newJobs = GetJobs();
// for each item:
foreach (var job in newJobs)
{
// add item to queue
jobs.Enqueue(job);
// If we have any workers halted, let them know there are new items!
if (consumers.Any(w => !w.IsRunning))
{
// POSSIBLE RACE - Consumer may set IsRunning=false, but haven't called wait yet!
// EDIT - Ordering does not matter. If semaphore is Released() before WaitOne() is
// called, then consumer will just continue as soon as it calls WaitOne()
// signal worker to continue via semaphore
consumerSemaphore.Release();
// wait until worker thread wakes up and takes item before unlocking queue
producerSemaphore.WaitOne();
}
}
} // unlock queue
// sleep for a bit
Thread.Sleep(500); // TODO - replace with cancellation token
}
});
}
}
public class Consumer
{
public bool IsRunning;
ConcurrentQueue<Job> jobs;
private object queueLock;
private Semaphore producerSemaphore;
private Semaphore consumerSemaphore;
public Consumer(ConcurrentQueue<Job> jobs, object queueLock, Semaphore producerSemaphore, Semaphore consumerSemaphore)
{
this.jobs = jobs;
this.queueLock = queueLock;
this.producerSemaphore = producerSemaphore;
this.consumerSemaphore = consumerSemaphore;
CurrentlyProcessing = true; // EDIT - must default to true so producer doesn't exit prematurely
}
public void StartJob() {
while(TryGetNextJob(out var job)) {
// do stuff with job
}
}
private bool TryGetNextJob(out Job nextJob)
{
// lock to prevent producer from producing items before we've had a chance to wait
lock (queueLock)
{
if (jobs.TryDequeue(out nextJob))
return true; // we have an item - let's process it
// worker halted
IsRunning = false;
}
// wait for signal from producer
consumerSemaphore.WaitOne();
// once received signal, there should be a new item in the queue - if there is not item, it means all children are finished
var itemDequeued = jobs.TryDequeue(out nextJob);
if (!itemDequeued)
{
return false; // looks like it's time to exit
}
// another item for us to process
IsRunning = true;
// let producer know it's safe to release queueLock
producerSemaphore.Release(); // POSSIBLE RACE - producer may not have locked yet! (WaitOne)
// EDIT - Order does not matter. If we call Release() before producer calls WaitOne(), then
// Producer will just continue as soon as it calls WaitOne().
return true;
}
}
public class Job { }