Search code examples
c#.netthread-safetycondition-variable

I would like to use Condition Variable in order to know when Messages Queue is not empty, i would like to use it in "HandleMessageQueue" as a thread


I would like to use Condition Variable in order to know when Messages Queue is not empty, i would like to use it in "HandleMessageQueue" as a thread

private static Queue<Message> messages = new Queue<Message>();

/// <summary>
/// function return the first message
/// </summary>
/// <returns>first message element</returns>
public static Message GetFirst()
{
  return messages.Dequeue();
}

in another class:

/// <summary>
/// Function run while the clients connected and handle the queue message
/// </summary>
public static void HandleMessageQueue()
{
   // ...
}

Solution

  • What you're probably looking for is a simple producer-consumer pattern. In this case I'd recommend using .NET's BlockingCollection, which allows you to easily handle the following cases:

    1. have one thread push stuff in a queue
    2. have another thread block until stuff is available
    3. make the whole thing easy to shutdown without having to forcibly terminate the thread

    Here's a short code sample, read the comments for more information about what every bit does:

    public class Queue : IDisposable
    {
        private readonly Thread _messageThread; // thread for processing messages
        private readonly BlockingCollection<Message> _messages; // queue for messages
        private readonly CancellationTokenSource _cancellation; // used to abort the processing when we're done
    
        // initializes everything and starts a processing thread
        public Queue()
        {
            _messages = new BlockingCollection<Message>();
            _cancellation = new CancellationTokenSource();
    
            _messageThread = new Thread(ProcessMessages);
            _messageThread.Start();
        }
    
        // processing thread function
        private void ProcessMessages()
        {
            try
            {
                while (!_cancellation.IsCancellationRequested)
                {
                    // Take() blocks until either:
                    // 1) a message is available, in which case it returns it, or
                    // 2) the cancellation token is cancelled, in which case it throws an OperationCanceledException
    
                    var message = _messages.Take(_cancellation.Token); 
                    // process the message here
                }
            }
            catch (OperationCanceledException)
            {
                // Take() was cancelled, let the thread exit
            }
        }
    
        // pushes a message
        public void QueueMessage(Message message)
        {
            _messages.Add(message);
        }
    
        // stops processing and clean up resources
        public void Dispose()
        {
            _cancellation.Cancel(); // let Take() abort by throwing
            _messageThread.Join(); // wait for thread to exit
            _cancellation.Dispose(); // release the cancellation source
            _messages.Dispose(); // release the queue
        }
    }
    

    Another option would be to combine a ConcurrentQueue<T> with a ManualResetEvent (events are roughly the .NET equivalent to condition variables), but that would be doing by hand what BlockingCollection<T> does).