I am trying to achieve Single producer Multiple consumer , however below code is not able to compile. Can someone help with this error ? Also would it work to wake all threads from this pool & a random thread would be able to acquire the lock?
` threadPool/main.cpp:4: /Library/Developer/CommandLineTools/usr/bin/../include/c++/v1/thread:364:17:
error: no matching constructor for initialization of '_Gp' (aka 'tuple<unique_ptrstd::__1::__thread_struct, void (TestClass::*)(), TestClass>') new _Gp(std::move(__tsp), ^ ~~~~~~~~~~~~~~~~~
ls/usr/bin/../include/c++/v1/type_traits:2422:12: error: call to implicitly-deleted copy constructor of 'typename
decay::type' (aka 'TestClass') return _VSTD::forward<_Tp>(__t);
----------------------------------------------
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <condition_variable>
using namespace std;
class TestClass{
public:
void producer(int i) {
unique_lock<mutex> lockGuard(mtx);
Q.push(i);
lockGuard.unlock();
cond.notify_all();
}
void consumer() {
unique_lock<mutex> lockGuard(mtx);
cond.wait(lockGuard, [this]() {
return !Q.empty();
});
cout<<this_thread::get_id();
cout<<Q.front()<<endl;
Q.pop();
lockGuard.unlock();
};
private:
mutex mtx;
condition_variable cond;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
int MAX_THREADS = std::thread::hardware_concurrency()-1;
vector<thread> ThreadVector;
TestClass testObj;
for(int i=0; i<MAX_THREADS; i++){
ThreadVector.emplace_back(&TestClass::consumer, std::move(testObj));
cout<<"Pool threadID:" <<ThreadVector[i].get_id()<<endl;
}
TestClass testObj2;
for(int i=0; i<10; i++) {
testObj.producer(i);
}
for(auto &&t : ThreadVector) {
t.join();
}
return 0;
}
`
Another version to call threads
int main()
{
std::vector<std::thread> vecOfThreads;
std::function<void(TestClass&)> func = [&](TestClass &obj) {
while(1) {
obj.consumer();
}
};
unsigned MAX_THREADS = std::thread::hardware_concurrency()-1;
TestClass obj;
for(int i=0; i<MAX_THREADS; i++) {
std::thread th1(func, std::ref(obj));
vecOfThreads.emplace_back(std::move(th1));
}
TestClass prod;
for(int i=0; i<10; i++) {
prod.producer(i);
}
for (std::thread & th : vecOfThreads)
{
if (th.joinable())
th.join();
}
return 0;
}
std::move(testObj)
should be &testObj
(a pointer to the object to call consumer
on) - or std::ref(testobj)
(which becomes a reference_wrapper
(holding a pointer to the object too).produce
at least as many times as you have threads, or else the program won't finish.unlock
manually. The guards unlock automatically when they go out of scope.Example:
class TestClass {
public:
void producer(int i) {
lock_guard<mutex> lockGuard(mtx); // here a lock_guard is enough
Q.push(i);
// no manual unlocking
cond.notify_all();
}
void consumer() {
unique_lock<mutex> lockGuard(mtx);
cond.wait(lockGuard, [this] { return !Q.empty(); });
cout << this_thread::get_id();
cout << Q.front() << endl;
Q.pop();
};
private:
mutex mtx;
condition_variable cond;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
unsigned MAX_THREADS = std::thread::hardware_concurrency() - 1;
vector<thread> ThreadVector;
ThreadVector.reserve(MAX_THREADS); // since you know how many, reserve
TestClass testObj;
for(unsigned i = 0; i < MAX_THREADS; i++) {
// here, &testobj
ThreadVector.emplace_back(&TestClass::consumer, &testObj);
cout << "Pool threadID:" << ThreadVector[i].get_id() << endl;
}
// produce MAX_THREADS of things to put in the queue:
for(int i = 0; i < MAX_THREADS; i++) {
testObj.producer(i);
}
for(auto&& t : ThreadVector) {
t.join();
}
}
Regarding your questions in the comment section: If you'd like to keep the consumer
threads running until you tell them to quit, you could add another variable (called run
here) that the consumer
threads monitor.
Example:
#include <condition_variable>
#include <iostream>
#include <queue>
#include <thread>
#include <vector>
using namespace std;
class TestClass {
public:
void producer(int i) {
lock_guard<mutex> lockGuard(mtx);
Q.push(i);
to_pool.notify_one();
}
void consumer() {
while(true) {
unique_lock<mutex> lockGuard(mtx);
to_pool.wait(lockGuard, [this] { return !run || !Q.empty(); });
if(!run) break; // time to quit
cout << this_thread::get_id() << ' ' << Q.front() << endl;
Q.pop();
// Tell producer that we picked one from the queue.
// if it's only interesting to notify when the queue is empty,
// add: if(Q.empty())
to_producer.notify_one();
}
};
void stop() {
lock_guard<mutex> lockGuard(mtx);
run = false; // tell all pool threads to quit
to_pool.notify_all();
}
void wait_for_all_work_to_be_done() {
std::unique_lock<mutex> lg(mtx);
to_producer.wait(lg, [this] { return Q.empty(); });
}
private:
bool run = true;
mutex mtx;
condition_variable to_pool;
condition_variable to_producer;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
unsigned MAX_THREADS = std::thread::hardware_concurrency() - 1;
vector<thread> ThreadVector;
ThreadVector.reserve(MAX_THREADS);
TestClass testObj;
for(unsigned i = 0; i < MAX_THREADS; i++) {
ThreadVector.emplace_back(&TestClass::consumer, &testObj);
cout << "Pool threadID:" << ThreadVector[i].get_id() << endl;
}
for(int i = 0; i < MAX_THREADS / 2; i++) {
testObj.producer(i);
}
testObj.wait_for_all_work_to_be_done();
// stop pool threads
testObj.stop();
for(auto&& t : ThreadVector) t.join();
}