Search code examples
c++multithreadingc++11lock-free

C++11 lockfree single producer single consumer: how to avoid busy wait


I'm trying to implement a class that uses two threads: one for the producer and one for the consumer. The current implementation does not use locks:

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>

using Queue =
        boost::lockfree::spsc_queue<
            int,
            boost::lockfree::capacity<1024>>;

class Worker
{
public:
    Worker() : working_(false), done_(false) {}
    ~Worker() {
        done_ = true;    // exit even if the work has not been completed
        worker_.join();
    }

    void enqueue(int value) {
        queue_.push(value);
        if (!working_) {
            working_ = true;
            worker_ = std::thread([this]{ work(); });
        }
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_ = false;
    }

private:
    std::atomic<bool> working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

The application needs to enqueue work items for a certain amount of time and then sleep waiting for an event. This is a minimal main that simulates the behavior:

int main()
{
    Worker w;
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

I'm pretty sure that my implementation is bugged: what if the worker thread completes and before executing working_ = false, another enqueue comes? Is it possible to make my code thread safe without using locks?

The solution requires:

  • a fast enqueue
  • the destructor has to quit even if the queue is not empty
  • no busy wait, because there are long period of time in which the worker thread is idle
  • no locks if possible

Edit

I did another implementation of the Worker class, based on your suggestions. Here is my second attempt:

class Worker
{
public:
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false) { } 

    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        if (worker_.joinable())
            worker_.join();
    }

    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set()) {
            if (worker_.joinable())
                worker_.join();
            worker_ = std::thread([this]{ work(); });
        }
        return enqueued;
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_.clear();
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
    }

private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

I introduced the worker_.join() inside the enqueue method. This can impact the performances, but in very rare cases (when the queue gets empty and before the thread exits, another enqueue comes). The working_ variable is now an atomic_flag that is set in enqueue and cleared in work. The Additional while after working_.clear() is needed because if another value is pushed, before the clear, but after the while, the value is not processed.

Is this implementation correct?

I did some tests and the implementation seems to work.

OT: Is it better to put this as an edit, or an answer?


Solution

  • This is my solution of the question. I don't like very much answering myself, but I think showing actual code may help others.

    #include <boost/lockfree/spsc_queue.hpp>
    #include <atomic>
    #include <thread>
    // I used this semaphore class: https://gist.github.com/yohhoy/2156481
    #include "binsem.hpp"
    
    using Queue =
        boost::lockfree::spsc_queue<
            int,
            boost::lockfree::capacity<1024>>;
    
    class Worker
    {
    public:
        // the worker thread starts in the constructor
        Worker()
            : working_(ATOMIC_FLAG_INIT), done_(false), semaphore_(0)
            , worker_([this]{ work(); })
        { } 
    
        ~Worker() {
            // exit even if the work has not been completed
            done_ = true;
            semaphore_.signal();
            worker_.join();
        }
    
        bool enqueue(int value) {
            bool enqueued = queue_.push(value);
            if (!working_.test_and_set())
                // signal to the worker thread to wake up
                semaphore_.signal();
            return enqueued;
        }
    
        void work() {
            int value;
            // the worker thread continue to live
            while (!done_) {
                // wait the start signal, sleeping
                semaphore_.wait();
                while (!done_ && queue_.pop(value)) {
                    // perform actual work
                    std::cout << value << std::endl;
                }
                working_.clear();
                while (!done_ && queue_.pop(value)) {
                    // perform actual work
                    std::cout << value << std::endl;
                }
            }
        }
    
    private:
        std::atomic_flag working_;
        std::atomic<bool> done_;
        binsem semaphore_;
        Queue queue_;
        std::thread worker_;
    };
    

    I tried the suggestion of @Cameron, to not shutdown the thread and adding a semaphore. This actually is used only in the first enqueue and in the last work. This is not lock-free, but only in these two cases.

    I did some performance comparison, between my previous version (see my edited question), and this one. There are no significant differences, when there are not many start and stop. However, the enqueue is 10 times faster when it have to signal the worker thread, instead of starting a new thread. This is a rare case, so it is not very important, but anyway it is an improvement.

    This implementation satisfies:

    • lock-free in the common case (when enqueue and work are busy);
    • no busy wait in case for long time there are not enqueue
    • the destructor exits as soon as possible
    • correctness?? :)