Search code examples
c++multithreadingboostsynchronizationboost-thread

Producer-Consumer: Lost Wake-up issue


I was trying to write code for Producer-Consumer problem. Below code works fine most of the time but stuck sometimes because of "Lost Wake-up" (i guess). I tried thread sleep() but it didn't work. What modification is needed to handle this case in my code? Is semaphore can be helpful here ? If yes, how will i implement them here ?

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>

using namespace std;

int product = 0;
boost::mutex mutex;
boost::condition_variable cv;
boost::condition_variable pv;
bool done = false;

void consumer(){
    while(done==false){
        //cout << "start c" << endl
        boost::mutex::scoped_lock lock(mutex);
        cv.wait(lock);
        //cout << "wakeup c" << endl;
        if (done==false)
        {
            cout << product << endl;
            //cout << "notify c" << endl;
            pv.notify_one();
        }
        //cout << "end c" << endl;
    }
}

void producer(){
    for(int i=0;i<10;i++){
        //cout << "start p" << endl;
        boost::mutex::scoped_lock lock(mutex);
        boost::this_thread::sleep(boost::posix_time::microseconds(50000));
        ++product;
        //cout << "notify p" << endl;
        cv.notify_one();
        pv.wait(lock);
        //cout << "wakeup p" << endl;
    }
    //cout << "end p" << endl;
    cv.notify_one();
    done = true;
}

int main()
{
    int t = 1000;
    while(t--){
        /*
        This is not perfect, and is prone to a subtle issue called the lost wakeup (for example, producer calls notify() 
        on the condition, but client hasn't really called wait() yet, then both will wait() indefinitely.) 
        */
        boost::thread consumerThread(&consumer);    
        boost::thread producerThread(&producer);

        producerThread.join();
        consumerThread.join();
        done =false;
        //cout << "process end" << endl;
    }
    cout << "done" << endl;
    getchar();
    return 0;
}

Solution

  • Yes, you want a way to know (in the consumer) that you "missed" a signal. A semaphore can help. There's more than one way to skin a cat, so here's my simple take on it (using just c++11 standard library features):

    class semaphore
    {
    private:
        std::mutex mtx;
        std::condition_variable cv;
        int count;
    
    public: 
        semaphore(int count_ = 0) : count(count_) { }
    
        void notify()
        {
            std::unique_lock<std::mutex> lck(mtx);
            ++count;
            cv.notify_one();
        }
    
        void wait() { return wait([]{}); }  // no-op action
    
        template <typename F>
        auto wait(F&& func = []{}) -> decltype(std::declval<F>()())
        {
            std::unique_lock<std::mutex> lck(mtx);
    
            while(count == 0){
                cv.wait(lck);
            }
            count--;
    
            return func();
        }
    };
    

    For convenience, I added a convenience wait() overload that takes a function to be executed under the lock. This makes it possible for the consumer to operate the 'semaphore' without ever manually operating the lock (and still get the value of product without data-races):

    semaphore sem;
    
    void consumer() {
        do {
            bool stop = false;
            int received_product = sem.wait([&stop] { stop = done; return product; });
    
            if (stop)
                break;
    
            std::cout << received_product << std::endl;
    
            std::unique_lock<std::mutex> lock(processed_mutex);
            processed_signal.notify_one();
        } while(true);
    }
    

    A fully working demo: Live on Coliru:

    #include <condition_variable>
    #include <iostream>
    #include <mutex>
    #include <thread>
    #include <cassert>
    
    class semaphore
    {
    private:
        std::mutex mtx;
        std::condition_variable cv;
        int count;
    
    public: 
        semaphore(int count_ = 0) : count(count_) { }
    
        void notify()
        {
            std::unique_lock<std::mutex> lck(mtx);
            ++count;
            cv.notify_one();
        }
    
        void wait() { return wait([]{}); }  // no-op action
    
        template <typename F>
        auto wait(F&& func = []{}) -> decltype(std::declval<F>()())
        {
            std::unique_lock<std::mutex> lck(mtx);
    
            while(count == 0){
                cv.wait(lck);
            }
            count--;
    
            return func();
        }
    };
    
    semaphore sem;
    
    int product = 0;
    std::mutex processed_mutex;
    std::condition_variable processed_signal;
    
    bool done = false;
    
    void consumer(int check) {
        do {
            bool stop = false;
            int received_product = sem.wait([&stop] { stop = done; return product; });
    
            if (stop)
                break;
    
            std::cout << received_product << std::endl;
            assert(++check == received_product);
    
            std::unique_lock<std::mutex> lock(processed_mutex);
            processed_signal.notify_one();
        } while(true);
    }
    
    void producer() {
        std::unique_lock<std::mutex> lock(processed_mutex);
        for(int i = 0; i < 10; ++i) {
            ++product;
            sem.notify();
            processed_signal.wait(lock);
        }
        done = true;
        sem.notify();
    }
    
    int main() {
        int t = 1000;
        while(t--) {
            std::thread consumerThread(&consumer, product);
            std::thread producerThread(&producer);
            producerThread.join();
            consumerThread.join();
            done = false;
            std::cout << "process end" << std::endl;
        }
        std::cout << "done" << std::endl;
    }