Search code examples
c++multithreadingc++11atomic

c++11 multi-reader / multi-writer queue using atomics for object state and perpetual incremented indexes


I am using atomics and a circular buffer in order to implement a multi-reader threads, multi-writer threads object pool.

It is difficult to investigate because instrumenting code leads to bug vanishment !

The model

Producers (or writer threads) request an Element to the Ring in order to 'prepare' the element. When terminated, the writer thread changes the element state so a reader can 'consume' it. After that, the element becomes available again for writing.

Consumers (or reader threads) request an object to the Ring in order to 'read' the object. After 'releasing' the object, the object is in a state::Ready state, eg available to be consume by a reader thread. It can fail if no object is available eg the next free object in the Ring is not on state::Unused state.

The 2 classes, Element and Ring

Element :

  • to be written, a writer thread must successfully exchange the _state member from state::Unused to state::LockForWrite
  • when finished, the writer thread force the state to state::Ready (it should be the only to handle this Element)
  • to be read, a rader thread must successfully exchange the _state member from state::Ready to state::LockForRead
  • when finished, the reader thread force the state to state::Unused (it should be the only to handle this Element)

Summarized :

  • writers lifecycle : state::Unused -> state::LockForWrite -> state::Ready
  • readers lifecycle : state::Ready -> state::LockForRead -> state::Unused

Ring

  • has a vector of Element , seen as a circular buffer.
  • std::atomic<int64_t> _read, _write; are the 2 indexes used to access the elements via :
    • _elems[ _write % _elems.size() ] for writers,
    • _elems[ _read % _elems.size() ] for readers.

When a reader has successfully LockForRead an object, the _read index is incremented. When a writer has successfully LockForWrite an object, the _write index is incremented.

The main :

We add to a vector some writers and readers threads sharing the same Ring. Each thread just try to get_read or get_write element and release them just after.

Based on Element transition everything should be fine but one can observe that the Ring at some point gets blocked like because some elements in the ring are in state state::Ready with a _write % _elems.size() index pointing on it and symetrically, some elements in the ring are in state state::Unused with a _read % _elems.size() index pointing on it ! Both = deadlock.

#include<atomic>
#include<vector>
#include<thread>
#include<iostream>
#include<cstdint>

typedef enum : int
{
    Unused, LockForWrite, Ready,  LockForRead
}state;

class Element
{
    std::atomic<state> _state;
public:
    Element():_state(Unused){ }

    // a reader need to successfully make the transition Ready => LockForRead
    bool lock_for_read() { state s = Ready; return _state.compare_exchange_strong(s, LockForRead); }
    void unlock_read() { state s = Unused; _state.store(s); }

    // a reader need to successfully make the transition Unused => LockForWrite
    bool lock_for_write() { state s = Unused; return _state.compare_exchange_strong(s, LockForWrite); }
    void unlock_write() { state s = Ready;  _state.store(s); }
};

class Ring
{
    std::vector<Element> _elems;
    std::atomic<int64_t> _read, _write;

public:
    Ring(size_t capacity)
        : _elems(capacity), _read(0), _write(0) {}

    Element * get_for_read() {
        Element * ret = &_elems[ _read.load() % _elems.size() ];
        if (!ret->lock_for_read()) // if success, the object belongs to the caller thread as reader
            return NULL;
        _read.fetch_add(1); // success! incr _read index 
        return ret;
    }
    Element * get_for_write() {
        Element * ret = &_elems[ _write.load() % _elems.size() ];
        if (!ret->lock_for_write())// if success, the object belongs to the caller thread as writer
            return NULL;
        _write.fetch_add(1); // success! incr _write index
        return ret;
    }
    void release_read(Element* e) { e->unlock_read();}
    void release_write(Element* e) { e->unlock_write();}
};

int main()
{

    const int capacity = 10; // easy to process modulo[![enter image description here][1]][1]

    std::atomic<bool> stop=false;

    Ring ring(capacity);

    std::function<void()> writer_job = [&]()
    {
        std::cout << "writer starting" << std::endl;
        Element * e;
        while (!stop)
        {
            if (!(e = ring.get_for_write())) 
                continue;
            // do some real writer job ...
            ring.release_write(e);
        }
    };
    std::function<void()> reader_job = [&]()
    {
        std::cout << "reader starting" << std::endl;
        Element * e;
        while (!stop)
        {
            if (!(e = ring.get_for_read())) 
                continue;
            // do some real reader job ...
            ring.release_read(e);
        }
    };

    int nb_writers = 1;
    int nb_readers = 2;

    std::vector<std::thread> threads;
    threads.reserve(nb_writers + nb_readers);

    std::cout << "adding writers" << std::endl;
    while (nb_writers--)
        threads.push_back(std::thread(writer_job));

    std::cout << "adding readers" << std::endl; 
    while (nb_readers--)
        threads.push_back(std::thread(reader_job));

    // wait user key press, halt in debugger after 1 or 2 seconds
    // in order to reproduce problem and watch ring
    std::cin.get();

    stop = true;

    std::cout << "waiting all threads...\n";
    for (auto & th : threads)
        th.join();

    std::cout << "end" << std::endl;
}

This "watch debugger screeshot" has been took pausing the program after running 1 second. As you can see, _read is pointing to the element 8 marked as state::Unused so no transition can unblock this state for this reader, except a writer but _write index is pointing on element 0 with state state::Ready !

screenshot

My question: what did I missed in this ? Structurally I am sure the sequence is correct but I am missing some atomic trick ...

os tested : rhel5/gcc 4.1.2, rhel 7/gcc 4.8, win10/ms visual 2015, win10/mingw


Solution

  • Yann's answer is correct about the problem: your threads can create "holes" in the sequence by reading and writing elements out-of-order if there's a delay between the read/write lock and the increment of the index. The fix is to verify that the index has not changed between the initial read and the increment, a la:

    class Element
    {
        std::atomic<state> _state;
    public:
        Element():_state(Unused){ }
    
        // a reader need to successfully make the transition Ready => LockForRead
        bool lock_for_read() {
            state s = Ready;
            return _state.compare_exchange_strong(s, LockForRead);
        }
        void abort_read() { _state = Ready; }
        void unlock_read() { state s = Unused; _state.store(s); }
    
        // a reader need to successfully make the transition Unused => LockForWrite
        bool lock_for_write() {
            state s = Unused;
            return _state.compare_exchange_strong(s, LockForWrite);
        }
        void abort_write() { _state = Unused; }
        void unlock_write() { state s = Ready;  _state.store(s); }
    };
    
    class Ring
    {
        std::vector<Element> _elems;
        std::atomic<int64_t> _read, _write;
    
    public:
        Ring(size_t capacity)
            : _elems(capacity), _read(0), _write(0) {}
    
        Element * get_for_read() {
            auto i = _read.load();
            Element * ret = &_elems[ i % _elems.size() ];
            if (ret->lock_for_read()) {
                // if success, the object belongs to the caller thread as reader
                if (_read.compare_exchange_strong(i, i + 1))
                    return ret;
                // Woops, reading out of order.
                ret->abort_read();
            }
            return NULL;
        }
        Element * get_for_write() {
            auto i = _write.load();
            Element * ret = &_elems[ i % _elems.size() ];
            if (ret->lock_for_write()) {
                // if success, the object belongs to the caller thread as writer
                if (_write.compare_exchange_strong(i, i + 1))
                    return ret;
                // Woops, writing out of order.
                ret->abort_write();
            }
            return NULL;
        }
        void release_read(Element* e) { e->unlock_read();}
        void release_write(Element* e) { e->unlock_write();}
    };