Search code examples
c++concurrencyatomicrace-conditionlock-free

Is it still atomic if the desired value of std::atomic<T>::compare_exchange_weak is return of a non-atomic operation?


Does head.load()->next break the atomicity here for the std::atomic::compare_exchange_weak used in while(head && !head.compare_exchange_weak(ptr, head.load()->next)); ? I believe it should, but again it does not seem so in the output (which is below after the code sample). Or even, does this question make sense?

Another question: is new_node->next = head.exchange(new_node); an atomic operation? I mean, surely the std::atomic::exchange is atomic, but what about the assignment of the return value? (Let's assume that the variable new_node is accessed by other threads, would any thread be able to come in between the assignment of the return value?)

Here is the code sample (compiled with -std=c++17 flag):

#include <iostream>
#include <thread>
#include <atomic>
#include <iomanip>
#include <array>
#include <future>
#include <mutex>
#include <exception>

static constexpr unsigned num_of_thread{ 100U };

struct Node{
    unsigned data;
    Node* next;

    Node() : data{0}, next{nullptr} {}
    Node(unsigned val) : data{val}, next{nullptr} {}
};

void test1() {
    std::atomic<Node*> head{ nullptr };
    std::cout << "Is lock-free?: " << std::boolalpha << head.is_lock_free() << std::endl;

    auto add_new_node = [&head](std::mutex& mut,
                                std::condition_variable& cond_var,
                                bool& guarantor,
                                unsigned i)
    {
        // Every thread creates its own unique node, so, no race condition here
        Node* new_node{ new Node{i} };
        {
            std::unique_lock<std::mutex> lck{ mut };
            cond_var.wait(lck, [&guarantor](){ return guarantor; });
            // All threads pile up here and wait for notification
            // (basically what std::barrier does)
        }

        // Ideally, all threads hit the below line at the same time.
        // Would there be a race condition here if new_node was a shared variable?
        // (Between the "exchange" atomic operation and the assignment of return value)
        new_node->next = head.exchange(new_node);
    };

    auto pop_a_node = [&head](std::mutex& mut,
                              std::condition_variable& cond_var,
                              bool& guarantor)
        {
        Node* ptr{ head };

        {
            std::unique_lock<std::mutex> lck{ mut };
            cond_var.wait(lck, [&guarantor](){ return guarantor; });
            // All threads pile up here and wait for notification
            // (basically what std::barrier does)
        }

        // Do we break atomicity here by using head.load()->next?
        // Or even, does this question make sense? 
        while(head && !head.compare_exchange_weak(ptr, head.load()->next));

        // Using ptr from this point onwards is safe as it is local to every thread
        if(ptr) {
            unsigned retval{ ptr->data };
            delete ptr;
            
            // Protect concurrent access to std::cout (nothing else)
            // in order not to get a scrambled output
            mut.lock();
            std::cout << retval << ' ';
            mut.unlock();
        }
        else {
            mut.lock();
            std::cout << "* ";
            mut.unlock();
        }
    };

    // Half of the threads add, the other half pops nodes
    [&pop_a_node, &add_new_node]() {
        std::condition_variable cond_var{};
        bool guarantor{ false }; // Needed in order to prevent spurious wakeups
        std::mutex mut;

        std::array<std::thread, num_of_thread> threads{};
        for(unsigned i{}; i<num_of_thread; i++) {
            if(i%2U) {
                threads[i] = std::move(std::thread{add_new_node,
                                                   std::ref(mut),
                                                   std::ref(cond_var),
                                                   std::ref(guarantor),
                                                   i});
            }
            else {
                threads[i] = std::move(std::thread{pop_a_node,
                                                   std::ref(mut),
                                                   std::ref(cond_var),
                                                   std::ref(guarantor)});
            }
        }

        // Wake all threads up at the same time. This should allow
        // more race condition possibilities compared to
        // just starting new threads repeatedly 
        guarantor = true;
        cond_var.notify_all();

        for(unsigned i{}; i<num_of_thread; i++) {
            threads[i].join();
        }
    }();
}

int main() {
    test1();

    return 0;
}

And here is a sample output: Is lock-free?: true

  • 49 9 71 69 95 * * * 97 3 87 * 27 81 85 51 11 29 53 5 * 31 23 19 99 45 * 55 83 13 * * 75 25 33 77 73 61 93 91 35 57 67 79 63 65 47 41 17

(The *s are when a thread cannot pop an element due to meeting an empty list. I checked the missing elements are still in the list.)


Solution

  • In the while loop

    while(head && !head.compare_exchange_weak(ptr, head.load()->next));
    

    you have 3 distinct atomic ops which are sequenced in order, but the whole thing is not atomic, so other threads might come in and do things between them

    • head (load head and check for null)
    • head.load() (load head and then deref to get next)
    • head.compare_exchange

    The obvious issue is that if two threads do a 'pop' at the same time, you might have the second thread come in and do its stuff between step 1 and 2 for the first thread. If there's only one thing on the stack at the time, then the first thread could pass the null check, and then get a nullptr on the second step (and likely crash dereferencing it).

    Normally one would write this loop as

    while((ptr = head.load()) && !head.compare_exchange_weak(ptr, ptr->next));
    

    That is, you only do a single load of the head value in the loop and then use that local value. This still has the classic ABA race problem if a node can be popped, freed, realloced and repushed all while one thread is trying to pop. That may or may not be a problem -- as long as the popping thread does not try to do anything with the node prior to completely popping it, it doesn't matter if it got switched out to a different node. It's still a node that was pushed and hasn't been popped by anyone else.


    You have a similar problem in your push primitive

    new_node->next = head.exchange(new_node);
    

    The store into new_node->next is not atomic with the exchange, so another thread might access it after the head.exchange, but before the store. So that code should be1:

    do { new_node->next = head.load(); } while (!head.compare_exchange_weak(new_node->next, new_node));
    

    This ensures that the store happens before the atomic operation that publishes the newly pushed node.


    1or while ((new_node->next = head.load()), !head.compare_exchange_weak(new_node->next, new_node)); if you prefer that style