I've implemented a simple Producer-Consumer message queue.
#include <chrono>
#include <iostream>
#include <thread>
#include <mutex>
#include <deque>
#define MESSAGE_QUIT 1
struct MessageQueue
{
std::deque<int> message_ids;
std::mutex mutex;
std::condition_variable condition_variable;
};
void SleepFor(int time_in_millis)
{
std::this_thread::sleep_for(std::chrono::milliseconds(time_in_millis));
}
void ProcessMessage(int message_id)
{
std::cout << "Processing Message #" << message_id << '\n';
}
void Producer(MessageQueue *messages)
{
for (int message_id = 10; message_id >= MESSAGE_QUIT; --message_id) {
std::unique_lock<std::mutex> guard(messages->mutex);
messages->message_ids.push_back(message_id);
guard.unlock();
messages->condition_variable.notify_one();
SleepFor(200);
}
}
void Consumer(MessageQueue *messages)
{
int next_message_id = -1;
while (next_message_id != MESSAGE_QUIT) {
std::unique_lock<std::mutex> guard(messages->mutex);
messages->condition_variable.wait(guard);
next_message_id = messages->message_ids.front();
messages->message_ids.pop_front();
guard.unlock();
ProcessMessage(next_message_id);
}
}
int main()
{
MessageQueue messages;
std::thread producer_thread(&Producer, &messages);
std::thread consumer_thread(&Consumer, &messages);
producer_thread.join();
consumer_thread.join();
}
The race condition: in some cases, the condition variable is calling notify_one()
in the producer thread while it is not in the waiting state in the consumer thread. How would you solve this? I am not considering the case of spurious waking for now.
Condition variables can spuriously wake up.
messages->condition_variable.wait(guard)
So don't do this. There are other reasons to not do this, including the fact that you can wait on a condition variable when there is data ready.
messages->condition_variable.wait(guard, [&]{return !messages->message_ids().empty();)
This won't wait if there are already messages in the queue. It also deals with spurious wakeups.