I have implemented a quite standard single-consumer-multiple-producer pattern, in C++ with the addition that there is a limit on the number of tasks in the queue.
A Worker runs a message queue on a separate thread. Tasks are sent to the Worker from producers. If there are already max_num_tasks_
in the queue, the producers must wait.
Compiles in VS2022 for windows x64 using toolset v143.
The following code hangs sporadically in Worker::Send()
at the statement cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });
. Anyone sees what I'm doing wrong?
Implementation of the Worker
#pragma once
#include <queue>
#include <mutex>
#include <functional>
class Worker {
public:
~Worker() {
Send([this] {done = true; });
thd.join();
}
Worker(size_t max_num_tasks) : max_num_tasks_(max_num_tasks), done(false), thd([this] {
while (!done) {
mq.PopFront()();
cv_.notify_all();
}
})
{ }
void Send(std::function<void()>&& m) {
{
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });
}
mq.PushBack(std::move(m));
}
private:
bool done;
size_t max_num_tasks_;
ThreadSafeQueue<std::function<void()>> mq;
std::thread thd;
std::mutex m_;
std::condition_variable cv_;
};
Implementation of the ThreadSafeQueue
#pragma once
#include <queue>
#include <mutex>
#include <functional>
template <typename T>
class ThreadSafeQueue {
public:
void PushBack(T&& val) {
{
std::unique_lock<std::mutex> lock(q_mutex);
q.push(std::move(val));
}
cv.notify_one();
}
void PushBack(const T& val) {
{
std::unique_lock<std::mutex> lock(q_mutex);
q.push(val);
}
cv.notify_one();
}
T PopFront() {
std::unique_lock<std::mutex> lock(q_mutex);
cv.wait(lock, [&] { return (!q.empty()); });
T v = q.front();
q.pop();
return std::move(v);
}
bool Empty() const {
std::unique_lock<std::mutex> lock(q_mutex);
return q.empty();
}
size_t Size() const {
std::unique_lock<std::mutex> lock(q_mutex);
return q.size();
}
private:
mutable std::mutex q_mutex;
std::condition_variable cv;
std::queue<T> q;
};
Example of unit test that occasionally triggers the issue:
TEST(WorkerTests, Compute_PI) {
std::atomic<double> value;
auto multiply_by_pi_over_four = [&value] {
int n = 750;
double v = 0;
for (int i = 0; i < n; i++) {
v += std::pow(-1, i) / (2 * i + 1);
}
value = value * v;
};
auto add_pi_over_four = [&value] {
int n = 750;
double v = 0;
for (int i = 0; i < n; i++) {
v += std::pow(-1, i) / (2 * i + 1);
}
value = value + v;
};
auto add_one = [&value] {
value = value + 1;
std::this_thread::sleep_for(std::chrono::milliseconds(3));
};
auto multiply_by_three = [&value] {
value = value * 3;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
};
for (int i = 0; i < 100; i++) {
value = 0.0;
{
Worker worker(1);
worker.Send(add_one);
worker.Send(multiply_by_pi_over_four);
worker.Send(multiply_by_three);
worker.Send([]() {});
worker.Send(add_pi_over_four);
}
EXPECT_GE(3.15, value.load());
EXPECT_LE(3.14, value.load());
}
}
cv_
s synchronization is broken in the shown code.
cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });
The sequence of events performed by wait()
is as follows:
The initial condition of the mutex is locked.
The wait()
condition is checked.
If condition is false, the mutex is atomically unlocked and the condition variable is waited for.
Step 3 is an atomic, indivisible, operation, But step 2 is not indivisible with step 3. It is a separate step.
So:
A. Step 2 happens. return (mq.Size() < max_num_tasks_);
evaluates to false.
B. The Worker thread wakes up and quickly blows through everything in mq
, signaling the condition variable each time, draining mq
until it's empty.
C. The other thread wakes up from its slumber, moves to step 3, unlocks the mutex and waits for someone to signal the condition variable.
D. Nothing ever will signal the condition variable. The worker thread slept in a deep slumber, while the other execution thread honked the condition variable, repeatedly, as it quickly drained the queue.
E. The call to wait()
now waits for the condition variable to be signaled, it never will.
Basically this wait()
needs to use the same condition variable, and mutex, as the worker thread is using to lock its thread. This logic, of wait()
ing for the queue to fall below its max. size, needs to be moved into the event queue and use the same mutex and condition variable.