Search code examples
c#multithreadingproducer-consumer

Producer Consumer race conditions


I have got an issue with race conditions. They are outlined in the code example where I write comments // POSSIBLE RACE. This design is something that I came up with myself, but it's got race issues and I am not sure how to overcome them. Perhaps using semaphores is the wrong choice.

Scenario: A producer should produce jobs while there are jobs in DB queue AND consumers are still processing jobs. If consumers have finished processing jobs, producer should release all consumers and the producer and consumers should exit.

How do I solve the issue below such that I can have a pool of consumers and one producer, where producer signals to consumers when to check queue for more items if they have run out?

Should I be using a different pattern? Should I be using Semaphore, Mutex, or some other kind of locking mechanism?

Thank you for your help! I have been trying to solve this issue for quite some time.

Fiddle: https://dotnetfiddle.net/Widget/SeNqQx

public class Producer
{
    readonly int processorCount = Environment.ProcessorCount;
    readonly List<Consumer> consumers = new List<Consumer>();
    ConcurrentQueue<Job> jobs;
    readonly object queueLock = new object();
    readonly Semaphore producerSemaphore;
    readonly Semaphore consumerSemaphore;

    public Producer()
    {
        producerSemaphore = new Semaphore(1, 1);
        consumerSemaphore = new Semaphore(processorCount, processorCount);
    }

    public void StartTask()
    {
        jobs = GetJobs();
        using (var resetEvent = new ManualResetEvent(false))
        {
            for (var i = 0; i < processorCount; i++)
            {
                var consumer = new Consumer(jobs, queueLock, producerSemaphore, consumerSemaphore);
                consumers.Add(consumer);
                QueueConsumer(consumer, processorCount, resetEvent);
            }

            AddJobsToQueueWhenAvailable(resetEvent);
            resetEvent.WaitOne(); // waits for QueueConsumer(..) to finish
        }
    }

    private ConcurrentQueue<Job> GetJobs(){
        var q = new ConcurrentQueue<Job>();
        for (var i = 0; i < 5; i++) q.Enqueue(new Job()); // this usually comes from DB queue
        return q;
    }

    private void QueueConsumer(Consumer consumer, int numberOfThreadsRunning, ManualResetEvent resetEvent)
    {
        ThreadPool.QueueUserWorkItem(_ =>
        {
            try
            {
                consumer.StartJob();
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception occurred " + ex);
            }
            finally
            {

                // Safely decrement the counter
                if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
                {
                        resetEvent.Set();
                }
            }
        });
    }
    private void AddJobsToQueueWhenAvailable(ManualResetEvent resetEvent)
    {
        ThreadPool.QueueUserWorkItem(_ =>
        {
            while (true) // TODO - replace with cancellation token
            {
                // lock queue - so that no workers will steal another workers item
                lock (queueLock)
                {
                    // check that at least 1 worker is still active
                    if (consumers.TrueForAll(w => !w.IsRunning))
                    {
                        // all jobs complete - release all locks if 0 workers active
                        consumerSemaphore.Release(processorCount);
                        return;
                    }

                    // poll for new items that have been added to the queue
                    var newJobs = GetJobs();

                    // for each item:
                    foreach (var job in newJobs)
                    {
                        // add item to queue
                        jobs.Enqueue(job);

                        // If we have any workers halted, let them know there are new items!
                        if (consumers.Any(w => !w.IsRunning))
                        {
                            // POSSIBLE RACE - Consumer may set IsRunning=false, but haven't called wait yet!
                            // signal worker to continue via semaphore
                            consumerSemaphore.Release(1);
                            // wait until worker thread wakes up and takes item before unlocking queue
                            producerSemaphore.WaitOne();
                        }
                    }
                } // unlock queue

                // sleep for a bit
                Thread.Sleep(500); // TODO - replace with cancellation token
            }
        });
    }
}

public class Consumer
{
    public bool IsRunning;
    ConcurrentQueue<Job> jobs;
    private object queueLock;
    private Semaphore producerSemaphore;
    private Semaphore consumerSemaphore;

    public Consumer(ConcurrentQueue<Job> jobs, object queueLock, Semaphore producerSemaphore, Semaphore consumerSemaphore)
    {
        this.jobs = jobs;
        this.queueLock = queueLock;
        this.producerSemaphore = producerSemaphore;
        this.consumerSemaphore = consumerSemaphore;
    }

    public void StartJob() {
        while(TryGetNextJob(out var job)) {
            // do stuff with job
        }
    }

    private bool TryGetNextJob(out Job nextJob)
    {
        // lock to prevent producer from producing items before we've had a chance to wait
        lock (queueLock)
        {
            if (jobs.TryDequeue(out nextJob))
                return true; // we have an item - let's process it

            // worker halted
            IsRunning = false;
        }

        // wait for signal from producer
        consumerSemaphore.WaitOne();

        // once received signal, there should be a new item in the queue - if there is not item, it means all children are finished
        var itemDequeued = jobs.TryDequeue(out nextJob);
        if (!itemDequeued)
        {
            return false; // looks like it's time to exit
        }

        // another item for us to process 
        IsRunning = true;
        // let producer know it's safe to release queueLock        
        producerSemaphore.Release(); // POSSIBLE RACE - producer may not have locked yet! (WaitOne)

        return true;
    }

}

public class Job { }

Solution

  • Thanks for the help. I will certainly look into BlockingCollection.

    So I actually wasn't far off what I wanted. I just needed to read a bit more on Semaphores (initialise with correct initial count) for the code to work correctly, as well as a few other bits and pieces. Search for EDIT to see what has changed. Working solution:

    public class Producer
    {
        readonly int processorCount = Environment.ProcessorCount;
        readonly List<Consumer> consumers = new List<Consumer>();
        ConcurrentQueue<Job> jobs;
        readonly object queueLock = new object();
        readonly Semaphore producerSemaphore;
        readonly Semaphore consumerSemaphore;
        int numberOfThreadsRunning;
    
        public Producer()
        {
            producerSemaphore = new Semaphore(0, 1); // EDIT - MUST START WITH 0 INITIALLY
            consumerSemaphore = new Semaphore(0, processorCount); // EDIT - MUST START WITH 0 INITIALLY
            numberOfThreadsRunning = processorCount; // EDIT - take copy so that Interlocked.Decrement references the same int variable in memory
        }
    
        public void StartTask()
        {
            jobs = GetJobs();
            using (var resetEvent = new ManualResetEvent(false))
            {
                for (var i = 0; i < processorCount; i++)
                {
                    var consumer = new Consumer(jobs, queueLock, producerSemaphore, consumerSemaphore);
                    consumers.Add(consumer);
                    QueueConsumer(consumer, resetEvent);
                }
    
                AddJobsToQueueWhenAvailable(resetEvent);
                resetEvent.WaitOne(); // waits for QueueConsumer(..) to finish
            }
        }
    
        private ConcurrentQueue<Job> GetJobs(){
            var q = new ConcurrentQueue<Job>();
            for (var i = 0; i < 5; i++) q.Enqueue(new Job()); // this usually comes from DB queue
            return q;
        }
    
        private void QueueConsumer(Consumer consumer, ManualResetEvent resetEvent)
        {
            ThreadPool.QueueUserWorkItem(_ =>
            {
                try
                {
                    consumer.StartJob();
                }
                catch (Exception ex)
                {
                    lock (queueLock)
                    {
                        consumers.Remove(worker);
                    }
                    Console.WriteLine("Exception occurred " + ex);
                }
                finally
                {
    
                    // Safely decrement the counter
                    if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
                    {
                            resetEvent.Set();
                    }
                }
            });
        }
        private void AddJobsToQueueWhenAvailable(ManualResetEvent resetEvent)
        {
            ThreadPool.QueueUserWorkItem(_ =>
            {
                while (true) // TODO - replace with cancellation token
                {
                    // lock queue - so that no workers will steal another workers item
                    lock (queueLock)
                    {
                        // check that at least 1 worker is still active
                        if (consumers.TrueForAll(w => !w.IsRunning))
                        {
                            // all jobs complete - release all locks if 0 workers active
                            consumerSemaphore.Release(processorCount);
                            return;
                        }
    
                        // poll for new items that have been added to the queue
                        var newJobs = GetJobs();
    
                        // for each item:
                        foreach (var job in newJobs)
                        {
                            // add item to queue
                            jobs.Enqueue(job);
    
                            // If we have any workers halted, let them know there are new items!
                            if (consumers.Any(w => !w.IsRunning))
                            {
                                // POSSIBLE RACE - Consumer may set IsRunning=false, but haven't called wait yet!
                                // EDIT - Ordering does not matter. If semaphore is Released() before WaitOne() is 
                                //        called, then consumer will just continue as soon as it calls WaitOne()
                                // signal worker to continue via semaphore
                                consumerSemaphore.Release();
                                // wait until worker thread wakes up and takes item before unlocking queue
                                producerSemaphore.WaitOne();
                            }
                        }
                    } // unlock queue
    
                    // sleep for a bit
                    Thread.Sleep(500); // TODO - replace with cancellation token
                }
            });
        }
    }
    
    public class Consumer
    {
        public bool IsRunning;
        ConcurrentQueue<Job> jobs;
        private object queueLock;
        private Semaphore producerSemaphore;
        private Semaphore consumerSemaphore;
    
        public Consumer(ConcurrentQueue<Job> jobs, object queueLock, Semaphore producerSemaphore, Semaphore consumerSemaphore)
        {
            this.jobs = jobs;
            this.queueLock = queueLock;
            this.producerSemaphore = producerSemaphore;
            this.consumerSemaphore = consumerSemaphore;
            CurrentlyProcessing = true; // EDIT - must default to true so producer doesn't exit prematurely
    
        }
    
        public void StartJob() {
            while(TryGetNextJob(out var job)) {
                // do stuff with job
            }
        }
    
        private bool TryGetNextJob(out Job nextJob)
        {
            // lock to prevent producer from producing items before we've had a chance to wait
            lock (queueLock)
            {
                if (jobs.TryDequeue(out nextJob))
                    return true; // we have an item - let's process it
    
                // worker halted
                IsRunning = false;
            }
    
            // wait for signal from producer
            consumerSemaphore.WaitOne();
    
            // once received signal, there should be a new item in the queue - if there is not item, it means all children are finished
            var itemDequeued = jobs.TryDequeue(out nextJob);
            if (!itemDequeued)
            {
                return false; // looks like it's time to exit
            }
    
            // another item for us to process 
            IsRunning = true;
            // let producer know it's safe to release queueLock        
            producerSemaphore.Release(); // POSSIBLE RACE - producer may not have locked yet! (WaitOne)
            // EDIT - Order does not matter. If we call Release() before producer calls WaitOne(), then
            //        Producer will just continue as soon as it calls WaitOne().
    
            return true;
        }
    
    }
    
    public class Job { }