Search code examples
cmultithreadingproducer-consumerlock-freecircular-buffer

Looking for the right ring buffer implementation in C


I am looking for a ring buffer implementation (or pseudocode) in C with the following characteristics:

  • multiple producer single consumer pattern (MPSC)
  • consumer blocks on empty
  • producers block on full
  • lock-free (I expect high contention)

So far I've been working only with SPSC buffers - one per producer - but I would like to avoid the continuous spinning of the consumer to check for new data over all its input buffers (and maybe to get rid of some marshaling threads in my system).

I develop for Linux on Intel machines.


Solution

  • I think I have what you are looking for. It is a lock free ring buffer implementation that blocks producer/consumer. You only need access to atomic primitives - in this example I will use gcc's sync functions.

    It has a known bug - if you overflow the buffer by more than 100% it is not guaranteed that the queue remains FIFO (it will still process them all eventually).

    This implementation relies on reading/writing the buffer elements as being an atomic operation (which is pretty much guaranteed for pointers)

    struct ringBuffer
    {
       void** buffer;
       uint64_t writePosition;
       size_t size;
       sem_t* semaphore;
    }
    
    //create the ring buffer
    struct ringBuffer* buf = calloc(1, sizeof(struct ringBuffer));
    buf->buffer = calloc(bufferSize, sizeof(void*));
    buf->size = bufferSize;
    buf->semaphore = malloc(sizeof(sem_t));
    sem_init(buf->semaphore, 0, 0);
    
    //producer
    void addToBuffer(void* newValue, struct ringBuffer* buf)
    {
       uint64_t writepos = __sync_fetch_and_add(&buf->writePosition, 1) % buf->size;
    
       //spin lock until buffer space available
       while(!__sync_bool_compare_and_swap(&(buf->buffer[writePosition]), NULL, newValue));
       sem_post(buf->semaphore);
    }    
    
    //consumer
    void processBuffer(struct ringBuffer* buf)
    {
       uint64_t readPos = 0;
       while(1)
       {
           sem_wait(buf->semaphore);
    
           //process buf->buffer[readPos % buf->size]
           buf->buffer[readPos % buf->size] = NULL;
           readPos++;
       }
    }