Search code examples
c++multithreadingatomic

The simpliest way to dequeue atomically?


I have a set of data that must be processed simultaneously by using multi-threading, the number of data supposedly is bigger than the number of threads. I decided to put the data into some kind of a queue so each free thread can pop its part and process it until the queue is empty. I could use a simple STL queue and lock it by a mutex when I want to dequeue an element out of it, but I'd like to try a lock-free approach. At the same time my project is too small to depend on some 3rd-party library that provides lock-free structures, in fact I need only atomic dequeuing. So I decided to implement my own queue based on a vector with a pointer to the "head" and increment this pointer atomically:

template <typename T>
class AtomicDequeueable
{
public:

    // Assumption: data vector never changes
    AtomicDequeueable(const std::vector<T>& data) :
        m_data(data),
        m_pointer(ATOMIC_VAR_INIT(0))
    {}

    const T * const atomicDequeue()
    {
        if (std::atomic_load(&m_pointer) < m_data.size())
        {
            return &m_data
            [
                std::atomic_fetch_add(&m_pointer, std::size_t(1))
            ];
        }

        return nullptr;
    }

private:

    AtomicDequeueable(const AtomicDequeueable<T>&) {}

    std::atomic_size_t m_pointer;
    const std::vector<T>& m_data;
};

Threads' function looks as follows:

void f(AtomicDequeueable<Data>& queue)
{
    while (auto dataPtr = queue.atomicDequeue())
    {
        const Data& data = *dataPtr;
        // processing data...
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }
}

My experience using lock-free structures and primitives is really poor, so I wonder: will my approach work properly? Surely I have tested it on Ideone, but I don't know how it will behave with real data.


Solution

  • Currently, your atomicDequeue function has a data race: it is possible for 2 threads to have both executed the first atomic instruction before executing the second. However, this can be fixed as you really only need 1 atomic operation, as per the following change:

    const T * const atomicDequeue()
    {
        auto myIndex = std::atomic_fetch_add(&m_pointer, std::size_t(1));
    
        if(myIndex >= m_data.size())
            return nullptr;
    
        return &m_data[myIndex];
    }
    

    This works provided nothing modifies the input vector during thread operation.