Search code examples
c++multithreadingmutexproducer-consumerbarrier

Synchronizing producer/consumer with a barrier


I have a producer thread which produces work for three consumer threads. When work has been produced, the producer thread waits until the consumer threads have finished handling the work. The producer thread then goes on handling the results.

#include <condition_variable>
#include <mutex>
#include <boost/thread/barrier.hpp>
#include <vector>
#include <queue>


std::condition_variable cond;
std::mutex mutex;
boost::barrier barrier(4);

std::vector<std::thread> workers;
std::queue<unsigned int> work;
std::queue<unsigned int> results;

void worker();

int main()
{
    // 1 producer and 3 consumers
    for(unsigned int i = 0; i < 3; i++)
        workers.push_back(std::thread(worker));

    // Wait here so the three workers can get to cond.wait();
    barrier.wait();

    std::unique_lock<std::mutex> lock(mutex);
    while(true)
    {
        // Generate work
        std::cout << "gen" << std::endl;
        for(unsigned int i = 0; i < 10; i++)
            work.push(i);

        cond.notify_all();

        lock.unlock();
        barrier.wait();

        // Handle the results
        while(results.size() > 0)
            results.pop();

        lock.lock();
    }

    return 0;
}

void worker()
{
    while(true)
    {
        std::unique_lock<std::mutex> lock(mutex);
        while(results.size() == 0)
        {
            lock.unlock();
            barrier.wait();
            lock.lock();
            cond.wait(lock);
        }

        // Get work
        unsigned int next = work.front();
        work.pop();

        // Store the result
        results.push(next);

        lock.unlock();


    }
}

The problem is that I need to make sure that all consumer threads have entered cond.wait(lock) before the producer thread starts its next iteration:

  1. All 4 threads have reached the barrier. The barrier gets released and the threads can continue.
  2. The producer thread locks the mutex before all consumer threads have reached cond.wait(lock). Thus at least one consumer thread is blocked by lock.lock().
  3. The producer thread starts its next iteration, creates work and notifies the consumers. Since at least one consumer thread has not yet reached cond.wait(lock) the notify_all() will be missed by at least one consumer thread. These threads now wait for the next notify_all() - which will never arrive.
  4. The next time the barrier is reached, at least one consumer thread still waits for the next notify_all(). Thus the barrier will not be unlocked and a deadlock occured.

How can I resolve this situation?


Solution

  • A condition_variable should be used together with a flag, to help prevent spurious wake ups. This same flag can also be used to check if the thread should wait at all or just go straight to work.

    Add a bool go_to_work=false;, then we simply add it as a predicate in the call to wait and make sure we set/unset it from the main thread.

    In main thread before calling notify_all we set our bool

    go_to_work=true;
    cond.notify_all();
    

    In our worker thread we add the predicate to our wait call

    cond.wait(lock, [](){ return go_to_work; });
    

    Lastly, in our main thread we want to set the flag back to false after all work has been done.

    barrier.wait();
    lock.lock();  // We need to lock the mutex before modifying the bool
    go_to_work=false;
    lock.unlock();
    
    //Handle result...
    

    Now if a thread reaches the wait call after the main thread has set go_to_work=true it will not wait at all and simply go ahead and do the work. As a bonus this also guards against spurious wake-ups.