Search code examples
c++multithreadingrace-conditionproducer-consumer

C++ Single Producer Multiple Consumer Program Crashes Sporadically


In the code below, I am creating a producer thread and n consumer threads that read from each of its dedicated queue and prints to the stdout. From time to time this code crashes at the statement consumerQueues[id]->empty(). Going through the debugger I see that consumerQueues[id] is 0x0 when it crashes. Now in the init() function, I create the ith consumer queue before the ith worker thread gets created. I am not sure why consumerQueues[id] will remain 0x0. Please help me figure what is going on.

#include <thread>
#include <queue>
#include <memory>
#include <iostream>
#include <mutex>
#include <condition_variable>

class Test
{
private:
    void producer()
    {
        while(true)
        {
            std::string s = "abc";
            for(const auto& q : consumerQueues)
            {
                std::unique_lock<std::mutex> lock(mutex);
                q->push(s);
                condition_variable.notify_all();
            }
        }
    }

    void consumer(int id)
    {
        while (true)
        {
            std::string job;
            {
                std::unique_lock<std::mutex> lock(mutex);
                while(consumerQueues[id]->empty())
                {
                    condition_variable.wait(lock);
                }
                job = consumerQueues[id]->front();
                consumerQueues[id]->pop();
            }
            std::cout << "ID "<< id << " job " << job << std::endl;
        }
    }

    std::mutex mutex;
    std::condition_variable condition_variable;
    std::vector<std::thread> workers;
    std::vector<std::shared_ptr<std::queue<std::string>>> consumerQueues;
    std::thread producerThread;

public:

    Test(const unsigned n_threads):
    workers(std::vector<std::thread>(n_threads))
    {}

    Test(const Test &) = delete;
    Test(Test &&) = delete;

    Test & operator=(const Test &) = delete;
    Test & operator=(Test &&) = delete;

    void init()
    {
        for (unsigned i = 0; i < workers.size(); ++i)
        {
            consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
            workers[i] = std::thread(&Test::consumer, this, i);
        }
       producerThread  = std::thread(&Test::producer, this);
    }

    ~Test()
    {
        producerThread.join();
        for (unsigned i = 0; i < workers.size(); ++i)
        {
            if(workers[i].joinable())
            {
                workers[i].join();
            }
        }
    }
};


int main()
{
    Test t(1000);
    t.init();
    return 0;
}

Solution

  • Your init function is modifying the std::vector with no mutex. This modifies the vector at the same time the threads are starting one by one.

    To make this work, your init function needs to be something like this:

     void init() {
         for (unsigned i = 0; i < workers.size(); ++i) {
                std::unique_lock<std::mutex> lock(mutex);
                consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
                workers[i] = std::thread(&Test::consumer, this, i);
         }
         producerThread  = std::thread(&Test::producer, this);
     }
    

    From: http://www.cplusplus.com/reference/vector/vector/push_back/

    Data races

    The container is modified. If a reallocation happens, all contained elements are modified. Otherwise, no existing element is accessed, and concurrently accessing or modifying them is safe.

    A reallocation happens pretty often when it starts at 0 elements and goes to 1000. so you can also reserve the size of the vector to make sure a reallocation is not happening:

     void init() {
         consumerQueues.reserve(workers.size());
         for (unsigned i = 0; i < workers.size(); ++i) {
                consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
                workers[i] = std::thread(&Test::consumer, this, i);
         }
         producerThread  = std::thread(&Test::producer, this);
     }