Search code examples
c++c++11atomiccompare-and-swap

How to implement Valois' Queue by C++11's CAS


I want to implement JD Valois' paper Implementing Lock-Free Queues by atomic CAS provided in C++11.

For example, Valois' algorithm defines an Enqueue function:

Enqueue(x) {
    q = new Record();
    q->value = x;
    q->next = NULL;

    do {
        p = tail;
    } while( ! CAS(p->next, NULL, q) ); // 1

    CAS(tail, p, q); // 2
}

And I wrote codes like this

struct Node {
    T * val;
    Node * next;
};
std::atomic<Node *> head, tail;

void enqueue(T * t) {
    Node * nd = new Node();
    nd->val = t;
    nd->next = nullptr;

    std::atomic<Node *> p;
    do {
        p = tail; // 1
    } while ( /* CAS1 */  ! std::atomic_compare_exchange_weak(&(p->next), nullptr, nd) );
    /* CAS2 */ std::atomic_compare_exchange_weak(&tail, p.load(), nd)
}

Then I found that the two cas functions CAS1 and CAS2 are used wrongly. For one reason, p->next is not a type of std::atomic, for another, the second argument expected of atomic_compare_exchange_weak requires a pointer. In the question Why do C++11 CAS operations take two pointer parameters?, the cas function will set *expected to current value, which causes a dereference of nullptr. Moreover, stmt 1, p = tail also fails, because atomic's operator= is deleted. So how can I implement a Lock-Free Queue according to JD Valois' paper?


Solution

  • You described all problems correctly. The next step is only fix the code.

    struct Node {
        T * val;
        std::atomic<Node *> next;
    };
    std::atomic<Node *> head, tail;
    
    void enqueue(T * t) {
        Node * nd = new Node();
        nd->val = t;
        nd->next = nullptr;
    
        Node *p, *next;
        do {
            p = tail.load(); // 1
            next = nullptr;
        } while (/* CAS1 */ !std::atomic_compare_exchange_weak(&p->next, &next, nd));
        /* CAS2 */ std::atomic_compare_exchange_weak(&tail, &p, nd);
    }