Search code examples
c#multithreadingproducer-consumertpl-dataflowblockingcollection

Producer/ Consumer pattern using threads and EventWaitHandle


I guess it is sort of a code review, but here is my implementation of the producer / consumer pattern. What I would like to know is would there be a case in which the while loops in the ReceivingThread() or SendingThread() methods might stop executing. Please note that EnqueueSend(DataSendEnqeueInfo info) is called from multiple different threads and I probably can't use tasks here since I definitely have to consume commands in a separate thread.

private Thread mReceivingThread;
private Thread mSendingThread;
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue;
private Queue<DataSendEnqeueInfo> mSendingThreadQueue;
private readonly object mReceivingQueueLock = new object();
private readonly object mSendingQueueLock = new object();
private bool mIsRunning;
EventWaitHandle mRcWaitHandle;
EventWaitHandle mSeWaitHandle;

private void ReceivingThread()
{
    while (mIsRunning)
    {
        mRcWaitHandle.WaitOne();
        DataRecievedEnqeueInfo item = null;
        while (mReceivingThreadQueue.Count > 0)
        {
            lock (mReceivingQueueLock)
            {
                item = mReceivingThreadQueue.Dequeue();
            }
            ProcessReceivingItem(item);
        }
        mRcWaitHandle.Reset();
    }
}

private void SendingThread()
{
    while (mIsRunning)
    {
        mSeWaitHandle.WaitOne();
        while (mSendingThreadQueue.Count > 0)
        {
            DataSendEnqeueInfo item = null;
            lock (mSendingQueueLock)
            {
                item = mSendingThreadQueue.Dequeue();
            }
            ProcessSendingItem(item);
        }
        mSeWaitHandle.Reset();
    }
}

internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
    lock (mReceivingQueueLock)
    {
        mReceivingThreadQueue.Enqueue(info);
        mRcWaitHandle.Set();
    }
}

public void EnqueueSend(DataSendEnqeueInfo info)
{
     lock (mSendingQueueLock)
    {
        mSendingThreadQueue.Enqueue(info);
        mSeWaitHandle.Set();
    }
}

P.S the idea here is that am using WaitHandles to put thread to sleep when the queue is empty, and signal them to start when new items are enqueued.

UPDATE I am just going to leave this https://blogs.msdn.microsoft.com/benwilli/2015/09/10/tasks-are-still-not-threads-and-async-is-not-parallel/ ,for people who might be trying to implement Producer/Consumer pattern using TPL or tasks.


Solution

  • Personally, for simple producer-consumer problems, I would just use BlockingCollection. There would be no need to manually code your own synchronization logic. The consuming threads will also block if there are no items present in the queue.

    Here is what your code might look like if you use this class:

    private BlockingCollection<DataRecievedEnqeueInfo> mReceivingThreadQueue = new BlockingCollection<DataRecievedEnqeueInfo>();
    private BlockingCollection<DataSendEnqeueInfo> mSendingThreadQueue = new BlockingCollection<DataSendEnqeueInfo>();
    
    public void Stop()
    {
        // No need for mIsRunning. Makes the enumerables in the GetConsumingEnumerable() calls
        // below to complete.
        mReceivingThreadQueue.CompleteAdding();
        mSendingThreadQueue.CompleteAdding();
    }
    
    private void ReceivingThread()
    {
        foreach (DataRecievedEnqeueInfo item in mReceivingThreadQueue.GetConsumingEnumerable())
        {
            ProcessReceivingItem(item);
        }
    }
    
    private void SendingThread()
    {
        foreach (DataSendEnqeueInfo item in mSendingThreadQueue.GetConsumingEnumerable())
        {
            ProcessSendingItem(item);
        }
    }
    
    internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
    {
        // You can also use TryAdd() if there is a possibility that you
        // can add items after you have stopped. Otherwise, this can throw an
        // an exception after CompleteAdding() has been called.
        mReceivingThreadQueue.Add(info);
    }
    
    public void EnqueueSend(DataSendEnqeueInfo info)
    {
        mSendingThreadQueue.Add(info);
    }