Search code examples
c++multithreadingthread-safetylock-free

Why does this lock free queue work?


I'm rewriting an old lock free queue implementation, I started by using memory_order_relaxed for everything with the intention of tightening up the memory semantics and adding standalone fences etc later. But strangely, it's working.. I've tried compiling with both XCode and VS2015 with maxxed out optimisation settings. I had code very similar to this failing about 1-1.5 years ago, the last time I wrote this.

Here's my queue:

#ifndef __LOCKFREEMPMCQUEUE_H__
#define __LOCKFREEMPMCQUEUE_H__

#include <atomic>

template <typename T> 
class LockFreeMPMCQueue
{
    public:

    explicit LockFreeMPMCQueue(size_t size)
        : m_data(new T[size])
        , m_size(size)
        , m_head_1(0)
        , m_head_2(0)
        , m_tail_1(0)
        , m_tail_2(0)
    {
    }

    virtual ~LockFreeMPMCQueue() { delete m_data; }

    bool try_enqueue(const T& value)
    {
        size_t tail = m_tail_1.load(std::memory_order_relaxed);
        const size_t head = m_head_2.load(std::memory_order_relaxed);
        const size_t count = tail - head;

        if (count == m_size)
        {
            return false;
        }

        if (std::atomic_compare_exchange_weak_explicit(&m_tail_1, &tail, (tail + 1), std::memory_order_relaxed,
                                   std::memory_order_relaxed) == false)
        {
            return false;
        }

        m_data[tail % m_size] = value;

        while (m_tail_2.load(std::memory_order_relaxed) != tail)
        {
            std::this_thread::yield();
        }
        m_tail_2.store(tail + 1, std::memory_order_relaxed);

        return true;
    }

    bool try_dequeue(T& out)
    {
        size_t head = m_head_1.load(std::memory_order_relaxed);
        const size_t tail = m_tail_2.load(std::memory_order_relaxed);

        if (head == tail)
        {
            return false;
        }

        if (std::atomic_compare_exchange_weak_explicit(&m_head_1, &head, (head + 1), std::memory_order_relaxed,
                                   std::memory_order_relaxed) == false)
        {
            return false;
        }

        out = m_data[head % m_size];

        while (m_head_2.load(std::memory_order_relaxed) != head)
        {
            std::this_thread::yield();
        }
        m_head_2.store(head + 1, std::memory_order_relaxed);

        return true;
    }

    size_t capacity() const { return m_size; }

    private:
    T* m_data;
    size_t m_size;
    std::atomic<size_t> m_head_1;
    std::atomic<size_t> m_head_2;
    std::atomic<size_t> m_tail_1;
    std::atomic<size_t> m_tail_2;
};

#endif

And here's the test I wrote:

#include <chrono>
#include <thread>
#include <vector>

#include "LockFreeMPMCQueue.h"

std::chrono::microseconds::rep test(LockFreeMPMCQueue<size_t>& queue, char* memory, const size_t num_threads, const size_t num_values)
{
    memset(memory, 0, sizeof(char) * num_values);

    const size_t num_values_per_thread = num_values / num_threads;

    std::thread* reader_threads = new std::thread[num_threads];
    std::thread* writer_threads = new std::thread[num_threads];

    auto start = std::chrono::high_resolution_clock::now();

    for (size_t i = 0; i < num_threads; ++i)
    {
        reader_threads[i] = std::thread([i, &queue, memory, num_values_per_thread]()
                        {
                            for (size_t x = 0; x < num_values_per_thread; ++x)
                            {
                                size_t value;
                                while (!queue.try_dequeue(value))
                                {
                                }
                                memory[value] = 1;
                            }
                        });
    }

    for (size_t i = 0; i < num_threads; ++i)
    {
        writer_threads[i] = std::thread([i, &queue, num_values_per_thread]()
                        {
                            const size_t offset = i * num_values_per_thread;
                            for (size_t x = 0; x < num_values_per_thread; ++x)
                            {
                                const size_t value = offset + x;
                                while (!queue.try_enqueue(value))
                                {
                                }
                            }
                        });
    }

    for (size_t i = 0; i < num_threads; ++i)
    {
        reader_threads[i].join();
        writer_threads[i].join();
    }

    auto time_taken = std::chrono::high_resolution_clock::now() - start;

    delete[] reader_threads;
    delete[] writer_threads;

    bool fail = false;
    for (size_t i = 0; i < num_values; ++i)
    {
        if (memory[i] == 0)
        {
            printf("%u = 0\n", i);
            fail = true;
        }
    }

    if (fail)
    {
        printf("FAIL!\n");
    }

    return std::chrono::duration_cast<std::chrono::milliseconds>(time_taken).count();
}

int main(int argc, char* argv[])
{
    const size_t num_threads_max = 16;
    const size_t num_values = 1 << 12;
    const size_t queue_size = 128;
    const size_t num_samples = 128;

    LockFreeMPMCQueue<size_t> queue( queue_size );
    char* memory = new char[num_values];

    const double inv_num_samples = 1.0 / double( num_samples );

    for( size_t num_threads = 1; num_threads <= num_threads_max; num_threads *= 2 )
    {
        double avg_time_taken = 0.0;

        for( size_t i = 0; i < num_samples; ++i )
        {
            avg_time_taken += test( queue, memory, num_threads, num_values ) * inv_num_samples;
        }

        printf("%u threads, %u ms\n", num_threads, avg_time_taken);
    }

    delete[] memory;

    char c;
    scanf("%c", &c);

    return 0;
}

Any help is much appreciated!


Solution

  • The memory order only specifies the minimum guarantee that you request from the generated code. The compiler and hardware are free to give stronger guarantees as they please.

    In particular, note that on x86 platforms many memory accesses are always synchronized by the hardware (for instance, loads on x86 are always sequentially consistent). That is why code that runs perfectly fine on x86 often breaks when ported to ARM or PowerPC without according for the weaker default synchronization on those platforms.

    Herb Sutter has a nice comparison table in his Atomic Weapons talk from C++ and Beyond 2012 (starts at about 31 minutes into the video; or look for the slides titled Code Generation, starting from page 34), where he shows how the different memory orderings may or may not lead to different code generated for different platforms.

    Bottom line: Just because your code seems to work fine on your machine now, does not mean that it's correct. That is one major reason why you should not mess around with the memory orderings unless you know exactly what you are doing (and even then you probably still shouldn't do it).