Search code examples
c++multithreadingasynchronouscondition-variable

Using a single Condition Variable to pause multiple threads


I have a program that starts N number of threads (async/future). I want the main thread to set up some data, then all threads should go while the main thread waits for all of the other threads to finish, and then this needs to loop.

What I have atm is something like this

int main()
{  
    //Start N new threads (std::future/std::async)
    while(condition)
    {
       //Set Up Data Here
       //Send Data to threads
       {
           std::lock_guard<std::mutex> lock(mrun);
           bRun = true;

       }
       run.notify_all();
       //Wait for threads
       {
           std::unique_lock<std::mutex> lock(mrun);
           run.wait(lock, [] {return bDone; });
       }
       //Reset bools
       bRun = false;
       bDone = false;
    }
    //Get results from futures once complete
}

int thread()
{
    while(otherCondition)
    {
       std::unique_lock<std::mutex> lock(mrun);
       run.wait(lock, [] {return bRun; });
       bDone = true;
       //Do thread stuff here
       lock.unlock();
       run.notify_all();
    }
}

But I can't see any signs of either the main or the other threads waiting for each other! Any idea what I am doing wrong or how I can do this?


Solution

  • There are a couple of problems. First, you're setting bDone as soon as the first worker wakes up. Thus the main thread wakes immediately and begins readying the next data set. You want to have the main thread wait until all workers have finished processing their data. Second, when a worker finishes processing, it loops around and immediately checks bRun. But it can't tell if bRun == true means that the next data set is ready or if the last data set is ready. You want to wait for the next data set.

    Something like this should work:

    std::mutex mrun;
    std::condition_variable dataReady;
    std::condition_variable workComplete;
    
    int nCurrentIteration = 0;
    int nWorkerCount = 0;
    
    int main()
    {
      //Start N new threads (std::future/std::async)
      while(condition)
      {
        //Set Up Data Here
        //Send Data to threads
        {
           std::lock_guard<std::mutex> lock(mrun);
           nWorkerCount = N;
           ++nCurrentIteration;
        }
        dataReady.notify_all();
        //Wait for threads
        {
           std::unique_lock<std::mutex> lock(mrun);
           workComplete.wait(lock, [] { return nWorkerCount == 0; });
        }
      }
      //Get results from futures once complete
    }
    
    int thread()
    {
      int nNextIteration == 1;
    
      while(otherCondition)
      {
        std::unique_lock<std::mutex> lock(mrun);
        dataReady.wait(lock, [&nNextIteration] { return nCurrentIteration==nNextIteration; });
        lock.unlock();
    
        ++nNextIteration;
    
        //Do thread stuff here
    
        lock.lock();
        if (--nWorkerCount == 0)
        {
          lock.unlock();
          workComplete.notify_one();
        }
      }
    }
    

    Be aware that this solution isn't quite complete. If a worker encounters an exception, then the main thread will hang (because the dead worker will never reduce nWorkerCount). You'll likely need a strategy to deal with that scenario.

    Incidentally, this pattern is called a barrier.