I wrote a "generic" multiple producer / consumer using C++11 (or higher) multithreading. The code (below) sort of works, but it hangs / crashes if too many producers / consumers threads are created.
The idea is to neatly separate the concerns: the MultiProducerConsumer object takes care of the protocols (thread maintenance, mutex, condvar) while the "user" injects the relevant functors doing the concrete work (producer, consumer, termination predicate) into the object.
Tested with VS 2017 and cygwin g++. The situation is worse on cygwin (why?). I cannot figure out what the problem is, and I could use a hint. Thanks in advance.
The header, multi_producer_consumer.hpp:
#pragma once
#include <algorithm>
#include <functional>
#include <iterator>
#include <thread>
#include <mutex>
#include <condition_variable>
//#include <cassert>
template<typename Container>
struct MultiProducerConsumer
{
using Type = typename Container::value_type;
using ModifierFct = std::function<void(Container&)>;
using DoneFctr = std::function<bool(const Container&)>;
MultiProducerConsumer(const Container& q,
ModifierFct producer,
ModifierFct consumer,
DoneFctr donef,
size_t n_producers,
size_t n_consumers):
m_queue(q),
m_pf(producer),
m_cf(consumer),
m_producers(n_producers),
m_consumers(n_consumers),
m_done(donef),
m_joined(false)
{
///std::lock_guard<std::mutex> lk(m_mutex);//why? to prevent the producers to start before consumers are created. So what, if they do?
for (auto i = 0; i < n_producers; ++i)
{
m_producers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::produce), this, i);
}
for (int i = 0; i < n_consumers; ++i)
{
m_consumers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::consume), this, i);
}
}
virtual ~MultiProducerConsumer(void)
{
if (!m_joined)
join();
}
virtual bool done(void) const
{
std::lock_guard<std::mutex> lk(m_mutex);
return m_done(m_queue);
}
void join(void)
{
std::for_each(m_producers.begin(), m_producers.end(), std::mem_fn(&std::thread::join));
std::for_each(m_consumers.begin(), m_consumers.end(), std::mem_fn(&std::thread::join));
m_joined = true;
}
protected:
virtual void produce(size_t i)
{
while (!done())
{
std::lock_guard<std::mutex> lk(m_mutex);
m_pf(m_queue);
///if (i == 0)//should only only one thread notify all the consumers...? nope
m_condvar.notify_all();//notifies all...not one
}
}
virtual void consume(size_t i)
{
while (!done())
{
std::unique_lock<std::mutex> lk(m_mutex);
m_condvar.wait(lk, [this]() {
return !m_queue.empty();
});
m_cf(m_queue);
}
}
private:
Container m_queue;
ModifierFct m_pf;
ModifierFct m_cf;
DoneFctr m_done;
mutable std::mutex m_mutex;
std::condition_variable m_condvar;
std::vector<std::thread> m_producers;
std::vector<std::thread> m_consumers;
bool m_joined;
};
The tester, below, uses a queue of vectors that are being "produced" (simply moved from an "outside" queue, matrix, into the producer / consumer queue). The consumers "consume" the vectors by summing each of them and storing the sum into another "outside" container (sums). The whole process terminates when the first vector summing up to zero is encountered. Below is the code:
#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include <queue>
#include <numeric>
#include <iterator>
#include <cassert>
#include "multi_producer_consumer.hpp"
template<typename T>
using QVec = std::queue<std::vector<T>>;
template<typename T>
inline
T sum(const std::vector<T>& v)
{
return std::accumulate(v.begin(), v.end(), 0);
}
template<typename T>
T from_string(std::string&& str)
{
T ret;
std::stringstream ss(str);
ss >> ret;
return ret;
}
int main(int argc, char* argv[])
{
int n_p = 1;
int n_c = 1;
if (argc == 3)
{
n_p = from_string<int>(argv[1]);
n_c = from_string<int>(argv[2]);
}
const unsigned long max_n_threads = std::thread::hardware_concurrency();
std::cout << "max # threads: " << max_n_threads << "\n";
std::cout << "n_producers: " << n_p << ", n_consumers: " << n_c << "\n";
try {
std::vector<int> vstart(1, 1);
std::vector<int> vstop(1, 0);
std::queue<std::vector<int>> matrix;
matrix.push(vstart);
matrix.push(std::vector<int>{ 1, 2, 3, 4, 5 });
matrix.push(std::vector<int>{ 6, 7, 8, 9 });
matrix.push(std::vector<int>{ 10, 11, 12, 13 });
matrix.push(vstop);
matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
std::vector<long> sums;
QVec<int> qqv;
//multi-producer-consumer that feeds vector from a queue
//to a consumer that sums them up, until sum is zero:
//
MultiProducerConsumer<QVec<int>> mpc(qqv,
[&matrix](QVec<int>& qv) { //producer function: move elements from matrix into qv
if (!matrix.empty())
{
auto v = matrix.front();
matrix.pop();
qv.push(v);
}
},
[&sums](QVec<int>& qv) { //consumer function: pop from qv and sum up elements
//if (!qv.empty())//this test is superfluous
//{
auto v = qv.front();
qv.pop();
sums.push_back(sum(v));
//}
},
[](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
return false;
}, n_p, n_c);//1,1 => okay; 1,2 => okay; 2,2 => okay; 5,5 => okay on Win64; hangs on cygwin; 5,10 => it can hang
//need main thread to block until producers/consumers are done,
//so that matrix/sums are not destructed while
//producers/consumers are still trying to use them:
//
mpc.join();
std::cout << "sums:\n";
std::copy(std::begin(sums), std::end(sums), std::ostream_iterator<int>(std::cout, "\n"));
}
catch (std::exception& ex)
{
std::cerr << ex.what() << "\n";
return 1;
}
catch (...)
{
std::cerr << "Unknown exception.\n";
return 1;
}
std::cout << "Done!" << std::endl;
return 0;
}
Something is wrong with it. Just cannot figure what.
[Edited] Follow up on Humphrey Winnebago's answer, especially on trying to fix Part 2: modified the granularity of producer / consumer operations to separate them from the queue maintenance. The essential part of the header below:
template<typename Container>
struct MultiProducerConsumer
{
using Type = typename Container::value_type;
using ModifierFct = std::function<void(Type&)>;
using DoneFctr = std::function<bool(const Container&)>;
MultiProducerConsumer(const Container& q,
ModifierFct producer,
ModifierFct consumer,
DoneFctr donef,
size_t n_producers,
size_t n_consumers):
m_queue(q),
m_pf(producer),
m_cf(consumer),
m_producers(n_producers),
m_consumers(n_consumers),
m_done(donef),
m_joined(false)
{
///std::lock_guard<std::mutex> lk(m_mutex);//why? to prevent the producers to start before consumers are created. So what, if they do?
for (auto i = 0; i < n_producers; ++i)
{
m_producers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::produce), this, i);
}
for (auto i = 0; i < n_consumers; ++i)
{
m_consumers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::consume), this, i);
}
}
virtual ~MultiProducerConsumer(void)
{
if (!m_joined)
join();
}
void join(void)
{
std::for_each(m_producers.begin(), m_producers.end(), std::mem_fn(&std::thread::join));
std::for_each(m_consumers.begin(), m_consumers.end(), std::mem_fn(&std::thread::join));
m_joined = true;
}
protected:
// be careful with the virtual functions + overloading
virtual bool done(std::lock_guard<std::mutex>&) const
{
return m_done(m_queue);
}
virtual bool done(std::unique_lock<std::mutex>&) const
{
return m_done(m_queue);
}
virtual void produce(size_t i)
{
while (true) // 1
{
///std::lock_guard<std::mutex> lk(m_mutex);
std::unique_lock<std::mutex> lk(m_mutex);
if (done(lk)) // 2
break;
Type v;
//Part 2/2: Serious design flaws:
//should move producer work outside of critical section
//but this requires call below to be surrounded by unlock/lock:
//
lk.unlock();
m_pf(v);
lk.lock();
m_queue.push(v);
m_condvar.notify_all();
}
m_condvar.notify_all(); // 3. need to break any sleeping consumers
}
virtual void consume(size_t i)
{
while (true) // 1
{
std::unique_lock<std::mutex> lk(m_mutex);
m_condvar.wait(lk, [this]() {
return !m_queue.empty();
});
if (done(lk)) // 2 & 3
break;
auto v = m_queue.front();
m_queue.pop();
//Consumer fix for Part 2 / 2: Serious design flaws:
//
lk.unlock();
//Consumer fix for Part 2 / 2: Serious design flaws:
//move outside this critical section
//
m_cf(v);
}
}
private:
Container m_queue;
ModifierFct m_pf;
ModifierFct m_cf;
DoneFctr m_done;
mutable std::mutex m_mutex;
std::condition_variable m_condvar;
std::vector<std::thread> m_producers;
std::vector<std::thread> m_consumers;
bool m_joined;
};
Of course, now there's a race condition on containers "matrix" and "sums" in the tester. But that's a different issue than the MPC abstraction. Yes, the code ends being serialized again, but that's because production uses an outside shared resource and consumption uses yet another shared resource. If the production used, say, a random # generator and the consumption stored results in separate locations then there would not be a serialization of the producer / consumers. The (essential part of the) new tester code with (what I hope is) the fix for Part 2:
//...
try {
std::vector<int> vstart(1, 1);
std::vector<int> vstop(1, 0);
std::queue<std::vector<int>> matrix;
matrix.push(vstart);
matrix.push(std::vector<int>{ 1, 2, 3, 4, 5 });
matrix.push(std::vector<int>{ 6, 7, 8, 9 });
matrix.push(std::vector<int>{ 10, 11, 12, 13 });
matrix.push(vstop);
matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
std::vector<long> sums;
QVec<int> qqv;
//now matrix and sum need to be protected
//but they're not the multi-producer-consumer's (MPC)
//responsibility anymore;
//
std::mutex sum_mutex;
std::mutex matrix_mutex;
//multi-producer-consumer that feeds vector from a queue
//to a consumer that sums them up, until sum is zero:
//
MultiProducerConsumer<QVec<int>> mpc(qqv,
[&matrix_mutex, &matrix](std::vector<int>& v) { //producer function: move elements from matrix into qv
if (!matrix.empty())
{
std::lock_guard<std::mutex> guard(matrix_mutex);
v = matrix.front();
matrix.pop();
}
},
[&sum_mutex, &sums](std::vector<int>& v) { //consumer function: pop from qv and sum up elements
long s = sum(v);
std::lock_guard<std::mutex> guard(sum_mutex);
sums.push_back(s);
},
[](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
return false;
}, n_p, n_c);
//need main thread to block until producers/consumers are done,
//so that matrix/sums are not destructed while
//producers/consumers are still trying to use them:
//
mpc.join();
std::cout << "sums:\n";
std::copy(std::begin(sums), std::end(sums), std::ostream_iterator<int>(std::cout, "\n"));
}
catch (std::exception& ex)
{
std::cerr << ex.what() << "\n";
return 1;
}
catch (...)
{
std::cerr << "Unknown exception.\n";
return 1;
}
//...
--- Part 1/2: Answer to why it's not working ---
matrix.push(vstop);
matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
Yes it is (sometimes). I found that when it hangs, it's because a producer sucked up that last item.
You have a flaw in the generalized producer and consumer functions. Using producer as an example:
virtual bool done(void) const
{
std::lock_guard<std::mutex> lk(m_mutex);
return m_done(m_queue);
}
virtual void produce(size_t i)
{
while (!done()) // <---- HERE to...
{
std::lock_guard<std::mutex> lk(m_mutex); // <----- ...HERE. done() condition may not hold, as mutex was released
m_pf(m_queue);consumers...? nope
m_condvar.notify_all();
}
}
You grab the mutex, check the condition, then release the mutex. Then grab the mutex again, assuming that the condition still holds. This IS a race condition. It's not the kind of race condition that will corrupt your data structures, but it is still a kind of race condition.
Under different circumstances this might actually work, but in this case it interacts poorly with your consumer's wait condition and your "done predicate".
"done predicate":
[](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
return false;
}
Fixes:
// be careful with the virtual functions + overloading
virtual bool done(std::lock_guard<std::mutex>&) const
{
return m_done(m_queue);
}
virtual bool done(std::unique_lock<std::mutex>&) const
{
return m_done(m_queue);
}
virtual void produce(size_t i)
{
while (true) // 1
{
std::lock_guard<std::mutex> lk(m_mutex);
if (done(lk)) // 2
break;
m_pf(m_queue);
m_condvar.notify_all();
}
m_condvar.notify_all(); // 3. need to break any sleeping consumers
}
virtual void consume(size_t i)
{
while (true) // 1
{
std::unique_lock<std::mutex> lk(m_mutex);
m_condvar.wait(lk, [this]() {
return !m_queue.empty();
});
if (done(lk)) // 2 & 3
break;
m_cf(m_queue);
}
}
That should be enough to fix it, but you could also patch up your "done" predicate
[&matrix](const QVec<int>& qv) { // 1
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
assert(!matrix.empty()); // 2
// or... if (matrix.empty()) throw, since you'll probably want to test in release mode
return false;
--- Part 2/2: Serious design flaws ---
Your producers and consumers own the mutex while they're doing the work. This isn't concurrent programming at all. It's nothing more than complicated sequential programming. There is no difference between your algorithm and one that just produces everything at once (on one thread) and then consumes it all (on the same thread), except that the latter has 100x less bug potential.
You should finish the critical section as fast as possible. Grab the mutex, touch shared data, and release. Then work on the data thread-locally.
You're copying and working while holding the mutex. It boggles my mind. You really need to re-think your design.
Move the enqueue and dequeue operations to produce and consume. Only grab the mutex for enqueue and dequeue. And don't copy...use move operations. Change your "done predicate" so it's not doing a lot of work.