Search code examples
c++multithreadingthreadpoolwaitstdthread

Efficiently waiting for all tasks in a threadpool to finish


I currently have a program with x workers in my threadpool. During the main loop y tasks are assigned to the workers to complete, but after the tasks are sent out I must wait for all tasks for finish before preceding with the program. I believe my current solution is inefficient, there must be a better way to wait for all tasks to finish but I am not sure how to go about this

// called in main after all tasks are enqueued to 
// std::deque<std::function<void()>> tasks
void ThreadPool::waitFinished()
{
    while(!tasks.empty()) //check if there are any tasks in queue waiting to be picked up
    {
        //do literally nothing
    }
}

More information:

threadpool structure

//worker thread objects
class Worker {
    public:
        Worker(ThreadPool& s): pool(s) {}
        void operator()();
    private:
        ThreadPool &pool;
};

//thread pool
class ThreadPool {
    public:
        ThreadPool(size_t);
        template<class F>
        void enqueue(F f);   
        void waitFinished();
        ~ThreadPool();
    private:
        friend class Worker;
        //keeps track of threads so we can join
        std::vector< std::thread > workers;
        //task queue
        std::deque< std::function<void()> > tasks;
        //sync
        std::mutex queue_mutex;
        std::condition_variable condition;
        bool stop;
};

or here's a gist of my threadpool.hpp

example of what I want to use waitFinished() for:

while(running)
    //....
    for all particles alive
        push particle position function to threadpool
    end for

    threadPool.waitFinished();

    push new particle position data into openGL buffer
end while

so this way I can send hundrends of thousands of particle position tasks to be done in parallel, wait for them to finish and put the new data inside the openGL position buffers


Solution

  • This is one way to do what you're trying. Using two condition variables on the same mutex is not for the light-hearted unless you know what is going on internally. I didn't need the atomic processed member other than my desire to demonstrate how many items were finished between each run.

    The sample workload function in this generates one million random int values, then sorts them (gotta heat my office one way or another). waitFinished will not return until the queue is empty and no threads are busy.

    #include <iostream>
    #include <deque>
    #include <functional>
    #include <thread>
    #include <condition_variable>
    #include <mutex>
    #include <random>
    
    //thread pool
    class ThreadPool
    {
    public:
        ThreadPool(unsigned int n = std::thread::hardware_concurrency());
    
        template<class F> void enqueue(F&& f);
        void waitFinished();
        ~ThreadPool();
    
        unsigned int getProcessed() const { return processed; }
    
    private:
        std::vector< std::thread > workers;
        std::deque< std::function<void()> > tasks;
        std::mutex queue_mutex;
        std::condition_variable cv_task;
        std::condition_variable cv_finished;
        std::atomic_uint processed;
        unsigned int busy;
        bool stop;
    
        void thread_proc();
    };
    
    ThreadPool::ThreadPool(unsigned int n)
        : busy()
        , processed()
        , stop()
    {
        for (unsigned int i=0; i<n; ++i)
            workers.emplace_back(std::bind(&ThreadPool::thread_proc, this));
    }
    
    ThreadPool::~ThreadPool()
    {
        // set stop-condition
        std::unique_lock<std::mutex> latch(queue_mutex);
        stop = true;
        cv_task.notify_all();
        latch.unlock();
    
        // all threads terminate, then we're done.
        for (auto& t : workers)
            t.join();
    }
    
    void ThreadPool::thread_proc()
    {
        while (true)
        {
            std::unique_lock<std::mutex> latch(queue_mutex);
            cv_task.wait(latch, [this](){ return stop || !tasks.empty(); });
            if (!tasks.empty())
            {
                // got work. set busy.
                ++busy;
    
                // pull from queue
                auto fn = tasks.front();
                tasks.pop_front();
    
                // release lock. run async
                latch.unlock();
    
                // run function outside context
                fn();
                ++processed;
    
                latch.lock();
                --busy;
                cv_finished.notify_one();
            }
            else if (stop)
            {
                break;
            }
        }
    }
    
    // generic function push
    template<class F>
    void ThreadPool::enqueue(F&& f)
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        tasks.emplace_back(std::forward<F>(f));
        cv_task.notify_one();
    }
    
    // waits until the queue is empty.
    void ThreadPool::waitFinished()
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        cv_finished.wait(lock, [this](){ return tasks.empty() && (busy == 0); });
    }
    
    // a cpu-busy task.
    void work_proc()
    {
        std::random_device rd;
        std::mt19937 rng(rd());
    
        // build a vector of random numbers
        std::vector<int> data;
        data.reserve(100000);
        std::generate_n(std::back_inserter(data), data.capacity(), [&](){ return rng(); });
        std::sort(data.begin(), data.end(), std::greater<int>());
    }
    
    int main()
    {
        ThreadPool tp;
    
        // run five batches of 100 items
        for (int x=0; x<5; ++x)
        {
            // queue 100 work tasks
            for (int i=0; i<100; ++i)
                tp.enqueue(work_proc);
    
            tp.waitFinished();
            std::cout << tp.getProcessed() << '\n';
        }
    
        // destructor will close down thread pool
        return EXIT_SUCCESS;
    }
    

    Output

    100
    200
    300
    400
    500
    

    Best of luck.