I have 2 processes: a producer and "consumer" which still leaves the values insider the buffer and they will just be overwritten.
But having the consumer keep track is posing a bit of a problem. When the buffer is full and values get overwritten, the value pointed at index 0 is the value just ahead of the value that was just overwritten (i.e the next oldest value) and the value that was just inserted is the last index, shifting all values in between.
cb.push_back(0)
cb.push_back(1)
cb.push_back(2)
consumer reads to cb[1], cb[2] should == 2 when next read
cb.push_back(3)
cb[2] now == 1 effectively reading the old value
Interestingly iterators on circular buffer do keep the same value even when the buffer starts to get overwritten and this would work okay except if when reading you do reach the end()
iterator it will always equal the end()
iterator even after inserting more values, so you then have to std::prev(iter, 1)
after you have finished consuming and then when you go to read again after more values have been inserted do std::next(iter, 1)
so that you dont read a value you already read.
I believe circular_buffer exists precisely to abstract the iterator positioning from you.
The fact that the buffer is circular shouldn't matter to you: it's just a queue interface.
How circular_buffer wants to be used can be seen very clearly in this example: http://www.boost.org/doc/libs/1_60_0/libs/circular_buffer/example/circular_buffer_sum_example.cpp
If you somehow want that level of control, you'll either
want to use simpler container primitive and build your own logic
you could write your bounded buffer on top of circular buffer. A full example of that is here: http://www.boost.org/doc/libs/1_60_0/libs/circular_buffer/test/bounded_buffer_comparison.cpp
The explanation mentions:
The bounded buffer is normally used in a producer-consumer mode [...]
[...]
The bounded buffer::pop_back() method does not remove the item but the item is left in the circular_buffer which then replaces it with a new one (inserted by a producer) when the circular_buffer is full. This technique is more effective than removing the item explicitly by calling the circular_buffer::pop_back() method of the circular_buffer.
It sounds like it should help you a lot.
Here's a demo adapted to use shared memory:
#define BOOST_CB_DISABLE_DEBUG
#include <boost/circular_buffer.hpp>
#include <boost/thread/thread.hpp>
#include <boost/call_traits.hpp>
#include <boost/bind.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <iostream>
const unsigned long QUEUE_SIZE = 1000L;
const unsigned long TOTAL_ELEMENTS = QUEUE_SIZE * 1000L;
namespace bip = boost::interprocess;
template <class T, class Alloc, typename CV = boost::condition_variable, typename Mutex = boost::mutex>
class bounded_buffer {
public:
typedef boost::circular_buffer<T, Alloc> container_type;
typedef typename container_type::size_type size_type;
typedef typename container_type::value_type value_type;
typedef typename container_type::allocator_type allocator_type;
typedef typename boost::call_traits<value_type>::param_type param_type;
bounded_buffer(size_type capacity, Alloc alloc = Alloc()) : m_unread(0), m_container(capacity, alloc) {}
void push_front(param_type item) {
boost::unique_lock<Mutex> lock(m_mutex);
m_not_full.wait(lock, boost::bind(&bounded_buffer::is_not_full, this));
m_container.push_front(item);
++m_unread;
lock.unlock();
m_not_empty.notify_one();
}
void pop_back(value_type* pItem) {
boost::unique_lock<Mutex> lock(m_mutex);
m_not_empty.wait(lock, boost::bind(&bounded_buffer::is_not_empty, this));
*pItem = m_container[--m_unread];
lock.unlock();
m_not_full.notify_one();
}
private:
bounded_buffer(const bounded_buffer&); // Disabled copy constructor
bounded_buffer& operator = (const bounded_buffer&); // Disabled assign operator
bool is_not_empty() const { return m_unread > 0; }
bool is_not_full() const { return m_unread < m_container.capacity(); }
size_type m_unread;
container_type m_container;
Mutex m_mutex;
CV m_not_empty;
CV m_not_full;
};
namespace Shared {
using segment = bip::managed_shared_memory;
using smgr = segment::segment_manager;
template <typename T> using alloc = bip::allocator<T, smgr>;
template <typename T> using bounded_buffer = ::bounded_buffer<T, alloc<T>, bip::interprocess_condition, bip::interprocess_mutex >;
}
template<class Buffer>
class Consumer {
typedef typename Buffer::value_type value_type;
Buffer* m_container;
value_type m_item;
public:
Consumer(Buffer* buffer) : m_container(buffer) {}
void operator()() {
for (unsigned long i = 0L; i < TOTAL_ELEMENTS; ++i) {
m_container->pop_back(&m_item);
}
}
};
template<class Buffer>
class Producer {
typedef typename Buffer::value_type value_type;
Buffer* m_container;
public:
Producer(Buffer* buffer) : m_container(buffer) {}
void operator()() {
for (unsigned long i = 0L; i < TOTAL_ELEMENTS; ++i) {
m_container->push_front(value_type());
}
}
};
int main(int argc, char**) {
using Buffer = Shared::bounded_buffer<int>;
if (argc>1) {
std::cout << "Creating shared buffer\n";
Shared::segment mem(bip::create_only, "test_bounded_buffer", 10<<20); // 10 MiB
Buffer* buffer = mem.find_or_construct<Buffer>("shared_buffer")(QUEUE_SIZE, mem.get_segment_manager());
assert(buffer);
// Initialize the buffer with some values before launching producer and consumer threads.
for (unsigned long i = QUEUE_SIZE / 2L; i > 0; --i) {
buffer->push_front(BOOST_DEDUCED_TYPENAME Buffer::value_type());
}
std::cout << "running producer\n";
Producer<Buffer> producer(buffer);
boost::thread(producer).join();
} else {
std::cout << "Opening shared buffer\n";
Shared::segment mem(bip::open_only, "test_bounded_buffer");
Buffer* buffer = mem.find_or_construct<Buffer>("shared_buffer")(QUEUE_SIZE, mem.get_segment_manager());
assert(buffer);
std::cout << "running consumer\n";
Consumer<Buffer> consumer(buffer);
boost::thread(consumer).join();
}
}
When you run two processes:
time (./test producer & sleep .1; ./test; wait)
Creating shared buffer
running producer
Opening shared buffer
running consumer
real 0m0.594s
user 0m0.372s
sys 0m0.600s