I'm trying to write a dirt simple threadpool to learn how they work under the hood. I've run into a problem, though. When I use my condition_variable and call notify_all(), it only wakes up one thread in my pool.
Everything else works fine. I've queued up 900 jobs, each with a decent payload. The one thread that wakes up consumes all those jobs, then goes back to sleep. On the next loop this all happens again.
The problem is that only one thread does the work! How have I messed up templating this?
ThreadPool.h:
#pragma once
#include <mutex>
#include <stack>
#include <atomic>
#include <thread>
#include <condition_variable>
class ThreadPool
{
friend void __stdcall ThreadFunc();
public:
static ThreadPool & GetInstance()
{
static ThreadPool sInstance;
return (sInstance);
}
public:
void AddJob(Job * job);
void DoAllJobs();
private:
Job * GetJob();
private:
const static uint32_t ThreadCount = 8;
std::mutex JobMutex;
std::stack<Job *> Jobs;
volatile std::atomic<int> JobWorkCounter;
std::mutex SharedLock;
std::thread Threads[ThreadCount];
std::condition_variable Signal;
private:
ThreadPool();
~ThreadPool();
public:
ThreadPool(ThreadPool const &) = delete;
void operator = (ThreadPool const &) = delete;
};
ThreadPool.cpp:
#include "ThreadPool.h"
void __stdcall ThreadFunc()
{
std::unique_lock<std::mutex> lock(ThreadPool::GetInstance().SharedLock);
while (true)
{
ThreadPool::GetInstance().Signal.wait(lock);
while (Job * job = ThreadPool::GetInstance().GetJob())
{
job->_jobFn(job->_args);
ThreadPool::GetInstance().JobWorkCounter--;
}
}
}
ThreadPool::ThreadPool()
{
JobWorkCounter = 0;
for (uint32_t i = 0; i < ThreadCount; ++i)
Threads[i] = std::thread(ThreadFunc);
}
ThreadPool::~ThreadPool()
{
}
void ThreadPool::AddJob(Job * job)
{
JobWorkCounter++;
JobMutex.lock();
{
Jobs.push(job);
}
JobMutex.unlock();
}
void ThreadPool::DoAllJobs()
{
Signal.notify_all();
while (JobWorkCounter > 0)
{
Sleep(0);
}
}
Job * ThreadPool::GetJob()
{
Job * return_value = nullptr;
JobMutex.lock();
{
if (Jobs.empty() == false)
{
return_value = Jobs.top();
Jobs.pop();
}
}
JobMutex.unlock();
return (return_value);
}
Thanks for any help! Sorry for the big code post.
std::unique_lock<std::mutex> lock(ThreadPool::GetInstance().SharedLock);
Each thread acquires this mutex, first.
ThreadPool::GetInstance().Signal.wait(lock);
All threads will receive the signal from the condition variable when the main thread does notify_all()
, but what you are forgetting one crucial detail: after waking up after being notified by a condition variable, the mutex gets automatically re-locked. That's how wait()
works. Read its documentation in your C++ book, or the manual pages; and only one thread will be able to do that. All the other threads that wake up will also try to lock the mutex, but only the first one wins the race and will do that, and all the other threads will sleep and continue dreaming.
A thread after being notified will not return from wait()
until that thread successfully relocks the mutex, too.
To return from wait()
two things must happen: the thread gets notified from the condition variable, and the thread relocks the mutex, successfully. wait()
unlocks the mutex and waits on the condition variable, atomically, and relocks the mutex when it is notified.
So, the lucky thread will lock the mutex, and proceed to drain the queue of all the jobs, then go back to the top of the loop and wait()
again. This unlocks the mutex, and now some other lucky thread, that's been notified but waiting patiently for its chance to bask in sunlight and glory, will be able to lock the mutex. In this manner, all the other threads will take turns, elephant-style, waking up, checking the job queue, find nothing there, and go to sleep.
This is the reason why you're seeing this behavior.
There are two basic things that must be done to make the shown code thread safe.
1) You do not need two mutexes, one will be perfectly sufficient.
2) Before wait()
ing on the condition variable, check if there's something in the job queue. If there is something, remove it, and unlock the mutex, then do the job.
3) wait()
only if the job queue is empty. After wait()
returns, relock the mutex, and then check if the job queue is still empty (you're not really guaranteed at this point that it's not empty, only that it may be non-empty).
You only need one mutex to protect access to the non-thread safe job queue, and to wait on the condition variable.