Search code examples
c++multithreadingc++11concurrencyproducer-consumer

Race condition in Producer-Consumer: limit notifications to when condition variable is waiting


I've implemented a simple Producer-Consumer message queue.

#include <chrono>
#include <iostream>
#include <thread>
#include <mutex>
#include <deque>


#define MESSAGE_QUIT 1


struct MessageQueue
{
    std::deque<int> message_ids;
    std::mutex mutex;
    std::condition_variable condition_variable;
};


void SleepFor(int time_in_millis)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(time_in_millis));
}


void ProcessMessage(int message_id)
{
    std::cout << "Processing Message #" << message_id << '\n';
}


void Producer(MessageQueue *messages)
{
    for (int message_id = 10; message_id >= MESSAGE_QUIT; --message_id) {
        std::unique_lock<std::mutex> guard(messages->mutex);    
        messages->message_ids.push_back(message_id);            
        guard.unlock();
        messages->condition_variable.notify_one();
        SleepFor(200);
    }
}


void Consumer(MessageQueue *messages)
{
    int next_message_id = -1;

    while (next_message_id != MESSAGE_QUIT) {
        std::unique_lock<std::mutex> guard(messages->mutex);
        messages->condition_variable.wait(guard);
        next_message_id = messages->message_ids.front();
        messages->message_ids.pop_front();
        guard.unlock();
        ProcessMessage(next_message_id);
    }
}


int main()
{
    MessageQueue messages;

    std::thread producer_thread(&Producer, &messages);
    std::thread consumer_thread(&Consumer, &messages);

    producer_thread.join();
    consumer_thread.join();
}

The race condition: in some cases, the condition variable is calling notify_one() in the producer thread while it is not in the waiting state in the consumer thread. How would you solve this? I am not considering the case of spurious waking for now.


Solution

  • Condition variables can spuriously wake up.

    messages->condition_variable.wait(guard)
    

    So don't do this. There are other reasons to not do this, including the fact that you can wait on a condition variable when there is data ready.

    messages->condition_variable.wait(guard, [&]{return !messages->message_ids().empty();)
    

    This won't wait if there are already messages in the queue. It also deals with spurious wakeups.