Search code examples
c++multithreadingcondition-variable

C++ - Multi-threading - Communication between threads


#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>

using namespace std;

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone){
    while(!workdone) {
        unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one();
     }
}

void work(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone) {
    while(!workdone) {
        unique_lock<std::mutex> lk(m);
        cv.wait(lk);
        cout << numbers.front() << endl;
        numbers.pop();
        consumer_count++;

     }
}

int main() {
    condition_variable cv;
    mutex m;
    bool workdone = false;
    queue<int> numbers;

    //start threads
    thread producer(generateNumbers, ref(numbers), ref(cv), ref(m),     ref(workdone));
    thread consumer(work, ref(numbers), ref(cv), ref(m), ref(workdone));

    //wait for 3 seconds, then join the threads
    this_thread::sleep_for(std::chrono::seconds(3));
    workdone = true;

    producer.join();
    consumer.join();

    //output the counters
    cout << producer_count << endl;
    cout << consumer_count << endl;

    return 0;
}

Hello Everyone, I tried to implement the Producer-Consumer-Pattern with C++. The producer thread generates random integers, adds them to a queue and then notifies the consumer thread that a new number was added.

The consumer thread waits for the notification and then prints the first element of the queue to the console and deletes it.

I incremented a counter for every number that is added to the queue and another counter for every number that is taken out of the queue.

I expected the two counters to hold the same value after the program is finished, however the difference is huge. The counter that represents the addition to the queue is always in the million range (3871876 in my last test) and the counter that represents the consumer which takes numbers out of the queue is always below 100k (89993 in my last test).

Can someone explain to me why there is such a huge difference? Do I have to add another condition variable so that the producer threads waits for the consumer thread as well? Thanks!


Solution

  • No need for a second std::condition_variable, just reuse the one you have. As mentioned by other you should consider using std::atomic<bool> instead of plain bool. But I must admit that g++ with -O3 does not optimize it away.

    #include <iostream>
    #include <thread>
    #include <condition_variable>
    #include <queue>
    #include <cstdlib>
    #include <chrono>
    #include <ctime>
    #include <random>
    #include <atomic>
    
    //counts every number that is added to the queue
    static long long producer_count = 0;
    //counts every number that is taken out of the queue
    static long long consumer_count = 0;
    
    void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
    {
        while(!workdone.load())
        {
            std::unique_lock<std::mutex> lk(m);
            int rndNum = rand() % 100;
            numbers.push(rndNum);
            producer_count++;
            cv.notify_one(); // Notify worker
            cv.wait(lk); // Wait for worker to complete
         }
    }
    
    void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
    {
        while(!workdone.load())
        {
            std::unique_lock<std::mutex> lk(m);
            cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
            cv.wait(lk); // Wait for the generator to complete
            std::cout << numbers.front() << std::endl;
            numbers.pop();
            consumer_count++;
         }
    }
    
    int main() {
        std::condition_variable cv;
        std::mutex m;
        std::atomic<bool> workdone(false);
        std::queue<int> numbers;
    
        //start threads
        std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
        std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
    
    
        //wait for 3 seconds, then join the threads
        std::this_thread::sleep_for(std::chrono::seconds(3));
        workdone = true;
        cv.notify_all(); // To prevent dead-lock
    
        producer.join();
        consumer.join();
    
        //output the counters
        std::cout << producer_count << std::endl;
        std::cout << consumer_count << std::endl;
    
        return 0;
    }
    

    EDIT:

    To avoid the sporadic off-by-one error you could use this:

    #include <iostream>
    #include <thread>
    #include <condition_variable>
    #include <queue>
    #include <cstdlib>
    #include <chrono>
    #include <ctime>
    #include <random>
    #include <atomic>
    
    //counts every number that is added to the queue
    static long long producer_count = 0;
    //counts every number that is taken out of the queue
    static long long consumer_count = 0;
    
    void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
    {
        while(!workdone.load())
        {
            std::unique_lock<std::mutex> lk(m);
            int rndNum = rand() % 100;
            numbers.push(rndNum);
            producer_count++;
            cv.notify_one(); // Notify worker
            cv.wait(lk); // Wait for worker to complete
         }
    }
    
    void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
    {
        while(!workdone.load() or !numbers.empty())
        {
            std::unique_lock<std::mutex> lk(m);
            cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
            if (numbers.empty())
                cv.wait(lk); // Wait for the generator to complete
            if (numbers.empty())
                continue;
            std::cout << numbers.front() << std::endl;
            numbers.pop();
            consumer_count++;
         }
    }
    
    int main() {
        std::condition_variable cv;
        std::mutex m;
        std::atomic<bool> workdone(false);
        std::queue<int> numbers;
    
        //start threads
        std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
        std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
    
    
        //wait for 3 seconds, then join the threads
        std::this_thread::sleep_for(std::chrono::seconds(1));
        workdone = true;
        cv.notify_all(); // To prevent dead-lock
    
        producer.join();
        consumer.join();
    
        //output the counters
        std::cout << producer_count << std::endl;
        std::cout << consumer_count << std::endl;
    
        return 0;
    }