Search code examples
multithreadingc++11condition-variable

Using condition variable to trigger threads one at a time


Given this example code taken and modified from cplusplus.com

#include "stdafx.h"
// condition_variable example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id(int id) {
    std::unique_lock<std::mutex> lck(mtx);
    while (!ready)
    {
        std::cout << "waiting for unlock";
        cv.wait(lck);
    }

    std::cout << "thread " << id << '\n';
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    cv.notify_one();
}

void go() {
    std::unique_lock<std::mutex> lck(mtx);
    ready = true;
    cv.notify_one();
}

int main()
{
    std::thread threads[10];
    // spawn 10 threads:
    for (int i = 0; i<10; ++i)
        threads[i] = std::thread(print_id, i);

    std::cout << "10 threads ready to race...\n";
    go();                       // go!

    for (auto& th : threads) th.join();

    return 0;
}

Does this properly block each thread until one is woken up by the notify_one() call? or is this essentially unlocking all threads?

I am attempting to start all the threads up at the same time, then block until the previous one finishes. The order is not important, but i can't have them executing the meat of the code all at the same time.


Solution

  • Your code works in the sense that no two worker threads can execute this code:

      std::cout << "thread " << id << '\n';
      std::this_thread::sleep_for(std::chrono::milliseconds(2000));
      cv.notify_one();
    

    concurrently, but this is not entirely because your use of std::condition_variable unblocks the worker threads one at a time. In fact, you haven't really changed the behavior from the original cplusplus.com example. In both programs the critical section of code is protected by a locked mutex, which guarantees that only one thread can hold the lock at any time.

    Your use of std::condition_variable to "serialize" the execution of the threads does not by itself prevent concurrent execution because calling notify_one() doesn't guarantee that only one thread is released from wait(). The reason is that threads are allowed to spuriously return from wait() without any notification:

    The function will unblock when signaled by a call to notify_one() or a call to notify_all(), or spuriously.

    In a sense you are being saved by the mutex. If you wrote your worker function like this:

    void print_id(int id) {
        {
            std::unique_lock<std::mutex> lck(mtx);
            while (!ready)
            {
                std::cout << "waiting for unlock";
                cv.wait(lck);
            }
        }
    
        std::cout << "thread " << id << '\n';
        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        cv.notify_one();
    }
    

    This code unlocks the mutex as soon as the thread is unblocked. This would not be safe because multiple threads can reach your critical section without the protection of the locked mutex.

    So I think your program does what you want - runs a section of code in many threads without concurrency - but perhaps not for the reason you expect.


    The idiomatic way of blocking on a condition is to define a predicate test that passes when the thread is ready to proceed but fails otherwise, and check it in a loop:

    std::unique_lock<std::mutex> lock(mutex);
    while (!predicate)
        condition.wait(lock);
    

    This idiom handles spurious wake-ups properly because the predicate test will fail and the thread will call wait() again.

    Although your program looks very similar to this, it doesn't quite do this because your predicate allows all the threads to proceed, not just one, and that isn't what you stated that you want. It turns out to work anyway because of the locked mutex.

    You could unblock your threads one at a time in order by changing your predicate test so that it passes only for the next thread, and by using notify_all():

    #include <atomic>
    ...
    std::atomic<int> ready(-1);
    
    void print_id(int id) {
       {
          std::unique_lock<std::mutex> lck(mtx);
          while (ready != id)
             cv.wait(lck);
       }
    
       std::cout << "thread " << id << '\n';
       std::this_thread::sleep_for(std::chrono::milliseconds(2000));
       ++ready;
       cv.notify_all();
    }
    
    void go() {
       ready = 0;
       cv.notify_all();
    }
    
    int main()
    {
       std::thread threads[10];
       // spawn 10 threads:
       for (int i = 0; i<10; ++i)
          threads[i] = std::thread(print_id, i);
    
       std::cout << "10 threads ready to race...\n";
       go();                       // go!
    
       for (auto& th : threads) th.join();
    
       return 0;
    }
    

    Note how this predicate test ready != id only passes when the thread should proceed. I also used a std::atomic<int> for ready so that notification does not require locking the mutex.

    This revised predicate code is correct, but one drawback is that we have to change notify_one() to notify_all() to make sure that we wake up the next thread. This wakes up all the threads only to put all but one of them back to waiting, which costs a little bit of performance. One way to optimize this would be to create N condition_variable instances (e.g. in an array) and have each thread wait on its own condition_variable instance.