Search code examples
c++multithreadingc++11queuestdatomic

Lockless queue using std::atomic


I wish to create a lockless queue using std::atomic.
Here's my probably not so good first attempt at trying to do so:

template <typename T>
class atomic_queue
{
public:
    using value_type = T;
private:
    struct node
    {
        value_type m_value;
        node* m_next;
        node* m_prev;

        node(const value_type& value) :
            m_value(value),
            m_next(nullptr),
            m_prev(nullptr) {}
    };
private:
    std::atomic<node*> m_head = nullptr;
    std::atomic<node*> m_tail = nullptr;
public:
    void push(const value_type& value)
    {
        auto new_node = new node(value);

        node* tmp = nullptr;
        if (m_tail.compare_exchange_strong(tmp, new_node))
        {
            m_head.store(new_node, std::memory_order_relaxed);
            return;
        }

        node* old_tail;
        do {
            old_tail = m_tail;
            new_node->m_prev = old_tail;
        } while (!m_tail.compare_exchange_strong(old_tail, new_node));
        new_node->m_prev->m_next = new_node;
    }

    void pop()
    {
        if (m_head.load(std::memory_order_relaxed) == nullptr)
        {
            return;
        }

        node* tmp = nullptr;
        node* head = m_head;
        if (m_tail.compare_exchange_strong(head, tmp))
        {
            m_head.store(tmp, std::memory_order_relaxed);
            return;
        }

        node* old_head;
        do {
            old_head = m_head;
        } while (m_head && !m_head.compare_exchange_strong(old_head, old_head->m_next));
        if (old_head)
        {
            delete old_head;
        }
    }

    bool empty()
    {
        return m_head.load(std::memory_order_relaxed) == nullptr;
    }

    value_type& front()
    {
        node* head = m_head.load(std::memory_order_acquire);
        return head->m_value;
    }
};

Something to note here is that I store m_prev on node so that I could update the m_next of m_tail after successful push without actually doing so via m_tail incase it was changed already by another thread. So even if another thread got to push a new value already, the current thread would still link what it saw as the m_tail's m_next to the new node.

Now there're a few things that are not really thread-safe as far as I can tell and which I can't really think of a good way to solve these problems:

Let's assume thread1 pops from the queue the one and only item then we go inside the following if statement:

        node* tmp = nullptr;
        node* head = m_head;
        if (m_tail.compare_exchange_strong(head, tmp))
        {
            // Now thread2 kicks in
            m_head.store(tmp, std::memory_order_relaxed);
            return;
        }

And let's assume thread2 kicks in at the marked spot to push a new value to the queue the the following statement will be executed:

        node* tmp = nullptr;
        if (m_tail.compare_exchange_strong(tmp, new_node))
        {
            m_head.store(new_node, std::memory_order_relaxed);
            return;
        }

and let us assume it finished it's pushing without thread1 continuing and only then thread1 continues, then thread1 will execute:

        m_head.store(tmp, std::memory_order_relaxed);
        return;

and will basically undo thread2's push by setting m_head to nullptr. As far as I can understand memory orders can't help me in this scenario so I'm not sure what my options are?

Another problematic scenario is that let's assume we have 2 reader threads thread3 and thread4 doing the same job:

    while (true)
    {
        if (!q.empty())
        {
            int v = q.front();
            q.pop();
            std::stringstream stream;
            stream << "thread_3/4: " << v << '\n';
            std::cout << stream.str();
        }
    }

And let us assume the queue is of size 1, so both of them could see that the queue is not empty and get a reference to the front data and then pop the element and print the same result.
It seems to me that locking would help in this scenario, but I do no wish to use locking and also I do not wish the reading threads to care about synchronization problems because the interface itself should be the one responsible, but since front and pop are independent I don't see a good way to handle this.
Also there's the problem that front might access nullptr, so even here I'm not sure how to handle this. I can make the interface return a raw pointer, or std::optional but both solutions seems not correct in my eyes so would love to hear opinions on what should be done here.

Also, I'm not sure if I could go away with cheaper methods other than CAS, I know I could go with unique slot approach where each thread get an index into a fixed array by using fetch_add on atomic of type std::atomic<int> slot and so each thread pushes to the queue to a unique index, but I don't like this approach since it makes the limitation of a fixed size queue. On the other hand using new and delete is probably not the fastest thing either, I could use a pool allocator of sort, but then I would have to make sure it's synchronized as-well and that's a new level of pain.

I'm not even sure these are all the problems, these are the problems that I could spot with my implementation, I'm sure I didn't think of everything (or maybe I did?), anyway would love to hear yours thoughts on the described problems and maybe ways to overcome them.


Solution

  • There are several issues with your implementation, some of which you have already identified correctly.

    1. The race between the two m_head.store operations after the CAS on m_tail
    2. This loop potentially suffers from the ABA problem:
      do {
            old_head = m_head;
      } while (m_head && !m_head.compare_exchange_strong(old_head, old_head->m_next));
    
    1. After removing a node in pop, you immediately delete it, but at that time another thread might still have a reference to it and access it (e.g. another thread in pop), resulting in a use-after-free. (This is also known as the memory reclamation problem.)
      Explanation: Suppose two threads are currently inside pop and have read the same value into old_head. The first thread goes ahead, performs the CAS on m_head and in the next step immediately deletes old_head. Only now the second thread continues with its own attempt to update m_head, using old_head->m_next as new value. That means thread two derefences a pointer to the just deleted node.
    2. Your design requires two separate function calls to pop an item from the queue and get its value.

    It is inherently difficult to design lock-free or even lock-less algorithms. The issues 2. and 3. can both be resolved by using a memory reclamation scheme. Issue 4. is usually avoided by not using a front operation, but instead have pop return the item (either directly via std::optional, or via a try_pop version that takes an out-parameter by reference and returns a bool that indicates whether the operation was successful).

    Either way, I would recommend to go with one of the established lock-free algorithms like the Michael-Scott-Queue. Unfortunately, if you decide to implement that algorithm, you still have to take care of the memory reclamation problem.

    I can refer you to my Xenium which provides not only an implementation of the Michael-Scott-Queue, but also several memory reclamation schemes in case you still want to experiment a bit yourself, but want to avoid the hassle with safe memory reclamation.


    A memory reclamation scheme is an algorithm to solve the memory reclamation problem. There are a lot of proposed solutions to solve the problem of safe memory reclamation, like hazard pointers or epoch based reclamation, but every scheme has its drawbacks. That's why the memory reclamation problem is still seen as the current most difficult open problem in shared memory concurrency. For more details I can refer you to my master's thesis Effective Memory Reclamation forLock-Free Data Structures in C++. It not only explains the memory reclamation problem and a large number of proposed reclamation schemes, but also discusses my implementation of some of these schemes, based on a generic interface. Xenium builds on that work.