Search code examples
concurrencyembeddedreal-timecircular-buffer

How to distinguish empty from full on a lock-free SPSC ring buffer that overwrites when full?


On a bare metal microcontroller real-time embedded system, I'm using a simple single producer single consumer (SPSC) ring buffer in overwrite mode*. Since it's SPSC, it needs no locks, and can simply do:

void write(data) {
  buf[writeptr] = data;
  tmp = (writeptr + 1) % BUF_SIZE; // Avoid writeptr++ so that modification of writeptr is atomic 
  writeptr = tmp;
}
T read() {
  d = buf[readptr];
  tmp = (readptr + 1) % BUF_SIZE;
  readptr = tmp;
}
bool is_empty() {
  return writeptr == readptr;
}

The problem, however, is that, an overflow won't just lose the one datum that was overwritten: it will cause the buffer to think it is empty, effectively losing all the data. And it's not just the is_empty that's wrong once: imagine that the buffer overflows and then has 2 data written. Once those 2 data are read, the the buffer will again appear to be empty, even though there are really BUF_SIZE - 2 data in it.

Solving this without locks isn't easy. My goals:

  • A single producer, single consumer ring buffer
  • That overwrites on overflow (ie overwriting the oldest data)
  • Suitable for a light weight, real-time, bare metal, embedded microcontroller
  • Ideally without locks

Is that possible? If not: What's a minimal usage of locks that achieves the other goals?

(Platform: ARM Cortex M3 bare-metal)


  • "Ring buffers can be used in either an overwrite mode or in producer/consumer mode... Overwrite mode is where if the producer were to fill up the buffer before the consumer could free up anything, the producer will overwrite the older data. This will lose the oldest events."

Solution

  • This is relatively simple as long as your buffer is no bigger than 64kB (so that the pointers fit in 16 bits).

    Put your read and write pointers in two 16-bit halves of a 32-bit word. Use LDREX and STREX operations in the writer to atomically adjust the read pointer to discard the oldest byte if the buffer was already full.

    Modifying your pseudocode:

    void write(data) {
      buf[writeptr] = data;
      tmp = (writeptr + 1) % BUF_SIZE;
    
      ATOMIC { 
        writeptr = tmp;
    
        if (readptr == tmp) {
    
          readptr = (tmp + 1) % BUF_SIZE;
        }
      }
    }