I implemented a single-producer-single-consumer lock-free Circular Buffer. The reference implementation is the one at: http://home.comcast.net/~lang.dennis/code/ring/ring.html (the Base-line implementation, the first listing from top).
EDIT: For reference the original code:
template <class T, size_t RingSize>
class RingBuffer
{
public:
RingBuffer(size_t size = 100)
: m_size(size), m_buffer(new T[size]), m_rIndex(0), m_wIndex(0)
{ assert(size > 1 && m_buffer != NULL); }
~RingBuffer()
{ delete [] m_buffer; };
size_t Next(size_t n) const
{ return (n+1)%m_size; }
bool Empty() const
{ return (m_rIndex == m_wIndex); }
bool Full() const
{ return (Next(m_wIndex) == m_rIndex); }
bool Put(const T& value)
{
if (Full())
return false;
m_buffer[m_wIndex] = value;
m_wIndex = Next(m_wIndex);
return true;
}
bool Get(T& value)
{
if (Empty())
return false;
value = m_buffer[m_rIndex];
m_rIndex = Next(m_rIndex);
return true;
}
private:
T* m_buffer;
size_t m_size;
// volatile is only used to keep compiler from placing values in registers.
// volatile does NOT make the index thread safe.
volatile size_t m_rIndex;
volatile size_t m_wIndex;
};
Mod: I store read and write indices in local variables and use only the local variables in expressions. I update them before returning from the functions (get and put).
For clarity the get function:
bool Get(T& value)
{
size_t w=m_wIndex;
size_t r=m_rIndex;
if (Empty(w,r))
return false;
value = m_buffer[r];
//just in case the compiler decides to be extra smart
compilerbarrier();
m_rIndex = Next(r);
return true;
}
Then, I created separate producer and consumer threads:
Producer Thread loop:
uint64_t i = 0;
while (i <= LOOPS) {
if (buf.put(i)) {
i += 1;
}
}
consumerthread.join(); //pseudocode: wait for the consumer to finish
Consumer Thread Loop:
uint64_t i=0;
while (i < LOOPS) {
buf.get(i);
}
Producer puts integers [0, LOOPS] into the buffer and the consumer gets them one at a time from the buffer until it finally gets the integer value LOOPS, and the consumer loop terminates. Please note that size of the buffer is much smaller than LOOPS.
If I keep the volatile keyword on for the read and write indices whole thing Works like a charm. The Consumer loop terminates and the producer returns.
But, if I remove the volatile keyword then the consumer never returns.
Strangely enough, This consumer loop terminates:
uint64_t i=0;
while (i < LOOPS) {
buf.get(i);
fprintf(stderr,"%lu\n",i);
}
And this one terminates as well:
uint64_t i=0, j=0;
while (j < LOOPS) {
if(buf.get(i)) {
j=i;
}
}
What is happening? I compiled the code using gcc 4.8.4 with -O3 flag set on a 64-bit intel (i3) machine running Ubuntu.
Since the issue only occurs when you remove volatile
, the issue is most likely caused by a compiler optimization that optimizes away reads of m_wIndex in the reader thread. The compiler thinks it knows everything that might change that value so there is no reason to read it from memory more than once.
Adding a call to fprintf defeats this optimization because it's possible that fprintf might modify that variable.
Adding j
made your code more complicated and also seems to defeat the optimization.