Search code examples
c++boostcircular-bufferboost-interprocess

boost::circular_buffer how to handle overwrite shift


I have 2 processes: a producer and "consumer" which still leaves the values insider the buffer and they will just be overwritten.

But having the consumer keep track is posing a bit of a problem. When the buffer is full and values get overwritten, the value pointed at index 0 is the value just ahead of the value that was just overwritten (i.e the next oldest value) and the value that was just inserted is the last index, shifting all values in between.

cb.push_back(0)
cb.push_back(1)
cb.push_back(2)

consumer reads to cb[1], cb[2] should == 2 when next read

cb.push_back(3)

cb[2] now == 1 effectively reading the old value

Interestingly iterators on circular buffer do keep the same value even when the buffer starts to get overwritten and this would work okay except if when reading you do reach the end() iterator it will always equal the end() iterator even after inserting more values, so you then have to std::prev(iter, 1) after you have finished consuming and then when you go to read again after more values have been inserted do std::next(iter, 1) so that you dont read a value you already read.


Solution

  • I believe circular_buffer exists precisely to abstract the iterator positioning from you.

    The fact that the buffer is circular shouldn't matter to you: it's just a queue interface.

    How circular_buffer wants to be used can be seen very clearly in this example: http://www.boost.org/doc/libs/1_60_0/libs/circular_buffer/example/circular_buffer_sum_example.cpp

    If you somehow want that level of control, you'll either

    • want to use simpler container primitive and build your own logic

    • you could write your bounded buffer on top of circular buffer. A full example of that is here: http://www.boost.org/doc/libs/1_60_0/libs/circular_buffer/test/bounded_buffer_comparison.cpp

      The explanation mentions:

      The bounded buffer is normally used in a producer-consumer mode [...]

      [...]

      The bounded buffer::pop_back() method does not remove the item but the item is left in the circular_buffer which then replaces it with a new one (inserted by a producer) when the circular_buffer is full. This technique is more effective than removing the item explicitly by calling the circular_buffer::pop_back() method of the circular_buffer.

    It sounds like it should help you a lot.

    UPDATE

    Here's a demo adapted to use shared memory:

    #define BOOST_CB_DISABLE_DEBUG
    
    #include <boost/circular_buffer.hpp>
    #include <boost/thread/thread.hpp>
    #include <boost/call_traits.hpp>
    #include <boost/bind.hpp>
    #include <boost/interprocess/allocators/allocator.hpp>
    #include <boost/interprocess/managed_shared_memory.hpp>
    #include <boost/interprocess/sync/interprocess_condition.hpp>
    #include <boost/interprocess/sync/interprocess_mutex.hpp>
    #include <iostream>
    
    const unsigned long QUEUE_SIZE     = 1000L;
    const unsigned long TOTAL_ELEMENTS = QUEUE_SIZE * 1000L;
    
    namespace bip = boost::interprocess;
    
    template <class T, class Alloc, typename CV = boost::condition_variable, typename Mutex = boost::mutex>
    class bounded_buffer {
    public:
        typedef boost::circular_buffer<T, Alloc> container_type;
        typedef typename container_type::size_type                  size_type;
        typedef typename container_type::value_type                 value_type;
        typedef typename container_type::allocator_type             allocator_type;
        typedef typename boost::call_traits<value_type>::param_type param_type;
    
        bounded_buffer(size_type capacity, Alloc alloc = Alloc()) : m_unread(0), m_container(capacity, alloc) {}
    
        void push_front(param_type item) {
            boost::unique_lock<Mutex> lock(m_mutex);
    
            m_not_full.wait(lock, boost::bind(&bounded_buffer::is_not_full, this));
            m_container.push_front(item);
            ++m_unread;
            lock.unlock();
    
            m_not_empty.notify_one();
        }
    
        void pop_back(value_type* pItem) {
            boost::unique_lock<Mutex> lock(m_mutex);
    
            m_not_empty.wait(lock, boost::bind(&bounded_buffer::is_not_empty, this));
            *pItem = m_container[--m_unread];
            lock.unlock();
    
            m_not_full.notify_one();
        }
    
    private:
        bounded_buffer(const bounded_buffer&);              // Disabled copy constructor
        bounded_buffer& operator = (const bounded_buffer&); // Disabled assign operator
    
        bool is_not_empty() const { return m_unread > 0; }
        bool is_not_full() const { return m_unread < m_container.capacity(); }
    
        size_type m_unread;
        container_type m_container;
        Mutex m_mutex;
        CV m_not_empty;
        CV m_not_full;
    };
    
    namespace Shared {
        using segment = bip::managed_shared_memory;
        using smgr    = segment::segment_manager;
        template <typename T> using alloc = bip::allocator<T, smgr>;
        template <typename T> using bounded_buffer = ::bounded_buffer<T, alloc<T>, bip::interprocess_condition, bip::interprocess_mutex >;
    }
    
    template<class Buffer>
    class Consumer {
    
        typedef typename Buffer::value_type value_type;
        Buffer* m_container;
        value_type m_item;
    
    public:
        Consumer(Buffer* buffer) : m_container(buffer) {}
    
        void operator()() {
            for (unsigned long i = 0L; i < TOTAL_ELEMENTS; ++i) {
                m_container->pop_back(&m_item);
            }
        }
    };
    
    template<class Buffer>
    class Producer {
    
        typedef typename Buffer::value_type value_type;
        Buffer* m_container;
    
    public:
        Producer(Buffer* buffer) : m_container(buffer) {}
    
        void operator()() {
            for (unsigned long i = 0L; i < TOTAL_ELEMENTS; ++i) {
                m_container->push_front(value_type());
            }
        }
    };
    
    int main(int argc, char**) {
        using Buffer = Shared::bounded_buffer<int>;
    
        if (argc>1) {
            std::cout << "Creating shared buffer\n";
            Shared::segment mem(bip::create_only, "test_bounded_buffer", 10<<20); // 10 MiB
            Buffer* buffer = mem.find_or_construct<Buffer>("shared_buffer")(QUEUE_SIZE, mem.get_segment_manager());
    
            assert(buffer);
    
            // Initialize the buffer with some values before launching producer and consumer threads.
            for (unsigned long i = QUEUE_SIZE / 2L; i > 0; --i) {
                buffer->push_front(BOOST_DEDUCED_TYPENAME Buffer::value_type());
            }
    
            std::cout << "running producer\n";
            Producer<Buffer> producer(buffer);
            boost::thread(producer).join();
        } else {
            std::cout << "Opening shared buffer\n";
    
            Shared::segment mem(bip::open_only, "test_bounded_buffer");
            Buffer* buffer = mem.find_or_construct<Buffer>("shared_buffer")(QUEUE_SIZE, mem.get_segment_manager());
    
            assert(buffer);
    
            std::cout << "running consumer\n";
            Consumer<Buffer> consumer(buffer);
            boost::thread(consumer).join();
        }
    }
    

    When you run two processes:

    time (./test producer & sleep .1; ./test; wait)
    Creating shared buffer
    running producer
    Opening shared buffer
    running consumer
    
    real    0m0.594s
    user    0m0.372s
    sys 0m0.600s