Search code examples
c++multithreadingboostboost-thread

Syncing Threads in Boost


I am trying to create an application which create one main thread and 10 slave threads. I want to run the slave threads once after the main thread is run. So for each main thread execution, each slave thread is going to execute once. I tried to handle this with two different conditional variables. So, one is used for slave threads so they can wait until the main thread notify them and another conditional variable for the main thread which is signaled after each child finish its task, so the main thread can check if all the slave threads are done or not. The code is as follows:

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>

std::vector<boost::thread*> threads;

std::vector<boost::mutex*> data_ready_mutex;
std::vector<boost::condition_variable*> cond;
std::vector<bool> data_ready;
std::vector<int> num_run;

boost::mutex check_finish_mutex;
std::vector<bool> finished;

boost::atomic<int> data;
boost::atomic<int> next_thread_id;

boost::mutex finished_task_mutex;
boost::condition_variable finished_task_cond;
bool finished_task = false;

void signal_finished(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(finished_task_mutex);
    finished[id] = true;
    finished_task = true;
  }
  finished_task_cond.notify_all();
}

void signal_slave(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(*data_ready_mutex[id]);

    data_ready[id] = true;
  }
  cond[id]->notify_all();
}

void slave_therad()
{
  int id = next_thread_id++;

  std::cout << "( " << id << " ) slave_thread created\n";
  while (true)
  {
    boost::unique_lock<boost::mutex> lock(*data_ready_mutex[id]);
    while (!data_ready[id])
    {
      cond[id]->wait(lock);
    }

    finished[id] = false;

    data_ready[id] = false;

    data++;

    num_run[id]++;

    signal_finished(id);
  }
}

void main()
{
  size_t nThreads = 10;

  data_ready_mutex.resize(nThreads);
  cond.resize(nThreads);
  data_ready.resize(nThreads);
  finished.resize(nThreads);
  num_run.resize(nThreads, 0);
  for (size_t i = 0; i < nThreads; i++)
  {
    data_ready_mutex[i] = new boost::mutex();
    cond[i] = new boost::condition_variable();
    data_ready[i] = false;
    finished[i] = false;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads.push_back(new boost::thread(slave_therad));
  }

  while (true)
  {
    clock_t start_time = clock();

    for (size_t i = 0; i < threads.size(); i++)
      signal_slave(static_cast<int>(i));

    while (true)
    {
      boost::unique_lock<boost::mutex> lock(finished_task_mutex);
      while (!finished_task)
      {
        finished_task_cond.wait(lock);
      }
      finished_task = false;

      size_t i = 0;
      for (; i < finished.size(); i++)
      {
        if (!finished[i]) break;
      }
      if (i == finished.size()) break;
    }

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << static_cast<float>(end_time - start_time) / CLOCKS_PER_SEC << std::endl;

    for (size_t i = 0; i < threads.size(); i++)
      finished[i] = false;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }
}

The problem is that somewhere the code stops and it stuck in deadlock.

Also, I tried to change the way to implement. So, I used an atomic<int> which counts the number of threads which has finished their task and in the main thread I check if the number of threads is equal to number of threads which has updated themselves but this method also stuck somewhere and goes into deadlock. The code can be found here:

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>

std::vector<boost::thread*> threads;                //!< Slave Threads array

std::vector<boost::mutex*>  data_ready_mutex;       //!< Mutex to guard the data_ready 
std::vector<bool>           data_ready;             //!< Shows if the data is ready for the slave thread or not.
std::vector<boost::condition_variable*> cond;       //!< conditional variable to wait on data being ready for the slave thread.

std::vector<int> num_run;                           //!< Stores the number of times each slave thread is run.

boost::atomic<int> data;                            //!< Stores the data processed by each slave thread
boost::atomic<int> next_thread_id;                  //!< id for the next thread (used for giving an id from 0,..., nThreads-1

boost::atomic<int> num_threads_done;                //!< Stores the number of slave threads which has finished their task

//! Signals a slave thread to start its task
void signal_slave(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(*data_ready_mutex[id]);
    data_ready[id] = true;
  }
  cond[id]->notify_all();
}

//! Slave thread function
void slave_therad()
{
  // assign an id to the current slave_thread
  int id = next_thread_id++;

  std::cout << "( " << id << " ) slave_thread created\n";
  while (true)
  {
    // wait for a signal from the main thread
    boost::unique_lock<boost::mutex> lock(*data_ready_mutex[id]);
    while (!data_ready[id])
    {
      cond[id]->wait(lock);
    }

    // make the data not ready, so the loop is not going to run without the main thread signal after the thread is done.
    data_ready[id] = false;

    // TASK for SLAVE THREAD
    data++;

    // Increase the number of times the thread is run
    num_run[id]++;

    // Increase the number of threads which has finished their tasks.
    num_threads_done++;

  }
}

void main()
{
  size_t nThreads = 10;

  // creating the data ready mutexes, conditional variables, data_ready variable (bools), num_runs array.
  data_ready_mutex.resize(nThreads);
  cond.resize(nThreads);
  data_ready.resize(nThreads);
  num_run.resize(nThreads, 0);
  for (size_t i = 0; i < nThreads; i++)
  {
    data_ready_mutex[i] = new boost::mutex();
    cond[i] = new boost::condition_variable();
    data_ready[i] = false;
  }

  // Creating the slave threads
  for (size_t i = 0; i < nThreads; i++)
  {
    threads.push_back(new boost::thread(slave_therad));
  }

  // Main Thread Body
  while (true)
  {
    clock_t start_time = clock();

    // Reset the number of threads which are done.
    num_threads_done = 0;

    // Signals the slave threads to start doing their task.
    for (size_t i = 0; i < threads.size(); i++)
      signal_slave(static_cast<int>(i));

    // Wait until all the slave threads are done.
    while (true)
      if (num_threads_done == threads.size()) break;

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << static_cast<float>(end_time - start_time) / CLOCKS_PER_SEC << std::endl;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }

}

Even, I tried to fix the issue with barriers but it did not fix my problem. the code is as follows:

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>

boost::barrier* barrier;                            //!< barrier to make sure all the slave threads are done their tasks.

std::vector<boost::thread*> threads;

std::vector<boost::mutex*>  data_ready_mutex;       //!< Mutex to guard the data_ready 
std::vector<bool>           data_ready;             //!< Shows if the data is ready for the slave thread or not.
std::vector<boost::condition_variable*> cond;       //!< conditional variable to wait on data being ready for the slave thread.

std::vector<int> num_run;                           //!< Stores the number of times each slave thread is run.

boost::atomic<int> data;                            //!< Stores the data processed by each slave thread
boost::atomic<int> next_thread_id;                  //!< id for the next thread (used for giving an id from 0,..., nThreads-1

boost::atomic<int> num_threads_done;                //!< Stores the number of slave threads which has finished their task

std::vector<bool> finished;                         //!< Array which stores if all the slave threads are done or not.
boost::mutex finished_task_mutex;                   //!< mutex to guard the finished_task variable
boost::condition_variable finished_task_cond;       //!< Conditional variable to wait for all the threads to finish they tasks.
boost::atomic<bool> finished_task(false);           //!< Variable which stores if the task of slave_threads are finished or not.

void signal_finished(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(finished_task_mutex);
    finished[id] = true;
    finished_task = true;
  }
  finished_task_cond.notify_all();
}

void signal_slave(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(*data_ready_mutex[id]);

    data_ready[id] = true;
  }
  cond[id]->notify_all();
}

void slave_therad()
{
  int id = next_thread_id++;

  std::cout << "( " << id << " ) slave_thread created\n";
  while (true)
  {
    boost::unique_lock<boost::mutex> lock(*data_ready_mutex[id]);
    while (!data_ready[id])
    {
      cond[id]->wait(lock);
    }

    finished[id] = false;

    data_ready[id] = false;

    data++;

    num_run[id]++;

    barrier->wait();

    signal_finished(id);
  }
}

void main()
{
  size_t nThreads = 10;

  data_ready_mutex.resize(nThreads);
  cond.resize(nThreads);
  data_ready.resize(nThreads);
  finished.resize(nThreads);
  num_run.resize(nThreads, 0);
  for (size_t i = 0; i < nThreads; i++)
  {
    data_ready_mutex[i] = new boost::mutex();
    cond[i] = new boost::condition_variable();
    data_ready[i] = false;
    finished[i] = false;
  }

  barrier = new boost::barrier(nThreads);

  for (size_t i = 0; i < nThreads; i++)
  {
    threads.push_back(new boost::thread(slave_therad));
  }

  while (true)
  {
    clock_t start_time = clock();

    for (size_t i = 0; i < threads.size(); i++)
      signal_slave(static_cast<int>(i));

    while (true)
    {
      boost::unique_lock<boost::mutex> lock(finished_task_mutex);
      while (!finished_task)
      {
        finished_task_cond.wait(lock);
      }
      finished_task = false;
      break;
    }

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << static_cast<float>(end_time - start_time) / CLOCKS_PER_SEC << std::endl;

    for (size_t i = 0; i < threads.size(); i++)
      finished[i] = false;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }
}

[UPDATED] So, I simply used the mutex, conditional variables and data_ready in a struct as follows and now the code is working. I think there was a bug with using pointer to mutex and so on. the code is as follows:

//#define SYNC_WITH_BARRIER
#define SYNC_WITH_ATOMICS

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>
#include <boost/ptr_container/ptr_vector.hpp>

std::vector<boost::thread*> threads;

boost::atomic<int> next_thread_id(0);

boost::mutex finished_task_mutex;
boost::condition_variable finished_task_cond;
bool finished_task = false;

boost::atomic<int> num_finished_tasks(0);

struct Work
{
  Work(boost::barrier& _barrier) : b(&_barrier)
  {

  }

  boost::barrier*           b;
  boost::mutex              data_ready_mutex;
  boost::condition_variable data_ready_cond;
  bool                      data_ready;
  int                       num_run;
  boost::atomic<int>        data;
  bool                      finished;

  void signal_slave()
  {
    {
      boost::lock_guard<boost::mutex> lock(data_ready_mutex);
      data_ready = true;
      data_ready_cond.notify_all();
    }    
  }

  void slave_therad()
  {
    int id = next_thread_id++;

    std::cout << "( " << id << " ) slave_thread created\n";
    while (true)
    {
      boost::unique_lock<boost::mutex> lock(data_ready_mutex);
      while (!data_ready)
      {
        data_ready_cond.wait(lock);
      }

      finished = false;

      data_ready = false;

      data++;

      num_run++;

#ifdef SYNC_WITH_BARRIER
      b->count_down_and_wait();
#else 
#ifdef SYNC_WITH_ATOMICS
      num_finished_tasks++;
#endif
#endif
    }
  }

};

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>

using hrc = boost::chrono::high_resolution_clock;

void main()
{
  size_t nThreads = 10;

  boost::thread_group tg;
  boost::ptr_vector<Work> work_items;
  work_items.reserve(nThreads);

  boost::barrier finish(nThreads + 1); // one for the main thread

  for (size_t i = 0; i < nThreads; i++)
  {
    work_items.push_back(new Work(finish));
    tg.create_thread(boost::bind(&Work::slave_therad, boost::ref(work_items.back()))); 
  }

  while (true)
  {
    auto start_time = hrc::now();

    num_finished_tasks = 0;

    for (size_t i = 0; i < work_items.size(); i++)
      work_items[i].signal_slave();

#ifdef SYNC_WITH_BARRIER
    finish.count_down_and_wait();
#else
#ifdef SYNC_WITH_ATOMICS
    while (true) if (num_finished_tasks == work_items.size()) break;
#endif
#endif

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << hrc::now() - start_time << std::endl;

  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }
}

Solution

  • @sehe even with barrier, it stuck in deadlock. – mmostajab 5 mins ago

    Since youdon't show anything about what you're doing there, let me give you a startup boost by incorporating a large chunk of all the suggestions you received:

    Live On Coliru

    #include <boost/atomic.hpp>
    #include <boost/thread.hpp>
    #include <boost/bind.hpp>
    #include <iostream>
    #include <vector>
    
    namespace /*static*/ {
        boost::atomic<int> data;
        boost::atomic<int> num_threads_done;
    
        struct Work {
            void signal_slave()
            {
                boost::lock_guard<boost::mutex> lock(data_ready_mutex);
                data_ready = true;
                cond.notify_all();
            }
    
            void slave_thread()
            {
                static boost::atomic_int _id_gen(0);
                id = _id_gen++;
    
                std::cout << "(" << id << ") slave_thread created\n";
                while (true) {
    
                    boost::unique_lock<boost::mutex> lock(data_ready_mutex);
                    cond.wait(lock, [&]{ return data_ready; });
    
                    data_ready = false;
    
                    data++;
    
                    num_run++;
    
                    num_threads_done++;
                }
            }
    
          private:
            int id          = 0;
            bool data_ready = false;
            int  num_run    = 0;
    
            boost::mutex data_ready_mutex;
            boost::condition_variable cond;
    
        };
    
    }
    
    #include <boost/chrono.hpp>
    #include <boost/chrono/chrono_io.hpp>
    
    using hrc = boost::chrono::high_resolution_clock;
    
    int main()
    {
        boost::thread_group tg;
    
        size_t nThreads = 10;
    
        std::vector<Work> works(nThreads);
    
        for (size_t i = 0; i < nThreads; i++) {
            tg.create_thread(boost::bind(&Work::slave_thread, boost::ref(works[i])));
        }
    
        while (true) {
            auto start_time = hrc::now();
    
            for (auto& w : works)
                w.signal_slave();
    
            std::cout << "Elapsed Time = " << (hrc::now()-start_time) << std::endl;
        }
    
        tg.join_all();
    }
    

    Bear in mind, I don't know what you're trying to achieve here. Adding a barrier I had this in mind: how to use boost barrier