I have been trying to implement a MPSC circular buffer in C for Linux. Here it is the buffer structure:
typedef struct mpsc_buffer_s {
sem_t semaphore;
unsigned char cache_pad_1[CACHE_LINE - sizeof(sem_t)];
uint64_t write_pos;
unsigned char cache_pad_2[CACHE_LINE - sizeof(uint64_t)];
size_t size;
unsigned char cache_pad_3[CACHE_LINE - sizeof(size_t)];
uint64_t read_pos;
unsigned char cache_pad_4[CACHE_LINE - sizeof(uint64_t)];
void **buffer;
} mpsc_buffer_t __attribute__ ((__aligned__(CACHE_LINE)));
The following are the associated functions:
mpsc_buffer_t* init_mpsc_buffer(size_t size) {
mpsc_buffer_t *new_buffer;
// allocation and init
posix_memalign((void**) &new_buffer, CACHE_LINE, sizeof(mpsc_buffer_t));
new_buffer->size = size;
new_buffer->read_pos = 0;
new_buffer->write_pos = 0;
int rc = sem_init(&new_buffer->semaphore, 0, 0);
ABORT_ON_ERR(rc, "Semaphore init failed");
// allocating internal pointers buffer
new_buffer->buffer = malloc(new_buffer->size * sizeof(void*));
memset(new_buffer->buffer, 0, new_buffer->size * sizeof(void*));
return new_buffer;
}
void add_to_buffer(mpsc_buffer_t *buffer, void *element) {
// get next address to write into
uint64_t write_pos = __sync_fetch_and_add(&buffer->write_pos, 1) % buffer->size;
//spin lock until the address is free
while(!__sync_bool_compare_and_swap(&(buffer->buffer[write_pos]), NULL, element));
// increment semaphore
int rc = sem_post(&buffer->semaphore);
ABORT_ON_ERR(rc, "Semaphore unlock failed");
}
void* get_from_buffer(mpsc_buffer_t *buffer) {
int rc = sem_wait(&buffer->semaphore);
ABORT_ON_ERR(rc, "Semaphore wait failed");
uint64_t read_pos = buffer->read_pos % buffer->size;
void *element = buffer->buffer[read_pos];
if(!element) {
error_print("cannot get NULL stuff - read_pos %u", read_pos);
}
buffer->buffer[read_pos] = NULL;
buffer->read_pos++;
return element;
}
I use this kind of buffer to pass pointers around. It is clear that I do not send NULL pointers.
A wild bug appears when I increment the number of producers from 2 to 3: then, the consumer starts to read NULL values. Since I do not actively send NULL pointers, it means that the consumer thread gets a positive semaphore but then reads a NULL value from the reading position.
On the other hand, some pointers get not cleared in the buffer, leading to potential deadlocks.
Is there a logical error in the algorithm or those problems may be related to cache mechanisms I fail to see ?
You have a race condition between incrementing the write index and assigning the entry pointer.
Consider the case where producer A increments the write index, but runs out of its timeslice. Meanwhile, producer B increments the write index again, populates the next entry -- remember, A didn't populate its entry yet --, and increments the semaphore. Now, if consumer C gets woken up before A, it has every reason to believe A has already populated its entry, and grabs it. Because it has not been populated yet, it is NULL.
In other words:
Producer A Producer B Consumer C
write_pos++
write_pos++
sets buffer[]
sem_post()
sem_wait()
read_pos++
uses buffer[]
sets buffer[]
sem_post()
sem_wait()
read_pos++
uses buffer[]
The more you have producers, the higher the probability that you see the above scenario.
The solution is simple: you add a write_pos2
counter, which serializes the writers, so that they post the semaphore in the correct sequence.
Consider the following example program:
#define _POSIX_C_SOURCE 200809L
#include <unistd.h>
#include <stdint.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <string.h>
#include <errno.h>
#include <stdio.h>
typedef struct {
sem_t semaphore;
uint64_t size;
volatile uint64_t wrnext; /* Next free write slot */
volatile uint64_t wrindex; /* Write index, second half */
volatile uint64_t rdindex; /* Read index */
void *entry[];
} cbuffer;
static cbuffer *cbuffer_destroy(cbuffer *const cbuf)
{
if (cbuf) {
cbuf->size = 0;
cbuf->wrnext = 0;
cbuf->wrindex = 0;
cbuf->rdindex = 0;
sem_destroy(&cbuf->semaphore);
free(cbuf);
}
return NULL;
}
static cbuffer *cbuffer_create(const size_t size)
{
cbuffer *cbuf;
if (size < 2) {
errno = EINVAL;
return NULL;
}
cbuf = malloc(sizeof *cbuf + size * sizeof cbuf->entry[0]);
if (!cbuf) {
errno = ENOMEM;
return NULL;
}
memset(cbuf->entry, 0, size * sizeof cbuf->entry[0]);
sem_init(&cbuf->semaphore, 0, 0);
cbuf->size = size;
cbuf->wrnext = 0;
cbuf->wrindex = 0;
cbuf->rdindex = 0;
return cbuf;
}
static void cbuffer_add(cbuffer *const cbuf, void *const entry)
{
uint64_t wrnext;
/* Get next nose value. */
wrnext = __sync_fetch_and_add(&cbuf->wrnext, (uint64_t)1);
/* Spin while buffer full. */
while (!__sync_bool_compare_and_swap(&cbuf->entry[wrnext % cbuf->size], NULL, entry))
;
/* Spin until we can update the head to match next. */
while (!__sync_bool_compare_and_swap(&cbuf->wrindex, wrnext, wrnext + (uint64_t)1))
;
/* TODO: check for -1 and errno == EOVERFLOW */
sem_post(&cbuf->semaphore);
}
static void *cbuffer_get(cbuffer *const cbuf)
{
uint64_t rdindex;
/* Get the index of the oldest entry. */
rdindex = __sync_fetch_and_add(&cbuf->rdindex, (uint64_t)1);
sem_wait(&cbuf->semaphore);
/* Pop entry. */
return __sync_fetch_and_and(&cbuf->entry[rdindex % cbuf->size], NULL);
}
static volatile int done = 0;
static cbuffer *cb = NULL;
void *consumer_thread(void *payload)
{
const long id = (long)payload;
unsigned long count = 0UL;
void *entry;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
while (1) {
if (done)
return NULL;
entry = cbuffer_get(cb);
count++;
if (!entry) {
printf("Consumer %ld: NULL pointer at %lu encountered!\n", id, count);
fflush(stderr);
done = 1;
return NULL;
}
}
}
void *producer_thread(void *payload __attribute__((unused)))
{
unsigned long count = 0UL;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
while (1) {
if (done)
return NULL;
cbuffer_add(cb, (void *)(256UL + (count & 255UL)));
}
}
int main(int argc, char *argv[])
{
pthread_attr_t attrs;
pthread_t *producer_id;
pthread_t *consumer_id;
sigset_t blocked;
siginfo_t info;
struct timespec timeout;
int producers, consumers, size, i, result;
char dummy;
if (argc != 4 || !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
fprintf(stderr, "\n");
fprintf(stderr, "Usage: %s [ -h | --help ]\n", argv[0]);
fprintf(stderr, " %s SIZE PRODUCERS CONSUMERS\n", argv[0]);
fprintf(stderr, "\n");
return 1;
}
if (sscanf(argv[1], " %d %c", &size, &dummy) != 1 || size < 2) {
fprintf(stderr, "%s: Invalid circular buffer size.\n", argv[1]);
return 1;
}
if (sscanf(argv[2], " %d %c", &producers, &dummy) != 1 || producers < 1) {
fprintf(stderr, "%s: Invalid number of producer threads.\n", argv[2]);
return 1;
}
if (sscanf(argv[3], " %d %c", &consumers, &dummy) != 1 || consumers < 1) {
fprintf(stderr, "%s: Invalid number of consumer threads.\n", argv[3]);
return 1;
}
cb = cbuffer_create(size);
producer_id = malloc((size_t)producers * sizeof *producer_id);
consumer_id = malloc((size_t)consumers * sizeof *consumer_id);
if (!cb || !producer_id || !consumer_id) {
fprintf(stderr, "%s.\n", strerror(ENOMEM));
return 1;
}
sigemptyset(&blocked);
sigaddset(&blocked, SIGINT);
sigaddset(&blocked, SIGTERM);
sigprocmask(SIG_BLOCK, &blocked, NULL);
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, 32768);
/* Start consumer threads. */
for (i = 0; i < consumers; i++) {
result = pthread_create(&consumer_id[i], &attrs, consumer_thread, (void *)(1L + (long)i));
if (result) {
fprintf(stderr, "Cannot start consumer threads: %s.\n", strerror(result));
exit(1);
}
}
/* Start producer threads. */
for (i = 0; i < producers; i++) {
result = pthread_create(&producer_id[i], &attrs, producer_thread, (void *)(1L + (long)i));
if (result) {
fprintf(stderr, "Cannot start producer threads: %s.\n", strerror(result));
exit(1);
}
}
pthread_attr_destroy(&attrs);
printf("Press CTRL+C or send SIGTERM to process %ld to stop testing.\n", (long)getpid());
fflush(stdout);
while (1) {
if (done)
break;
timeout.tv_sec = (time_t)0;
timeout.tv_nsec = 10000000L; /* 0.010000000 seconds */
result = sigtimedwait(&blocked, &info, &timeout);
if (result != -1 || errno != EAGAIN) {
done = 1;
break;
}
}
printf("Exiting...\n");
fflush(stdout);
for (i = 0; i < producers; i++)
pthread_cancel(producer_id[i]);
for (i = 0; i < consumers; i++)
pthread_cancel(consumer_id[i]);
for (i = 0; i < producers; i++)
pthread_join(producer_id[i], NULL);
for (i = 0; i < consumers; i++)
pthread_join(consumer_id[i], NULL);
cb = cbuffer_destroy(cb);
free(producer_id);
free(consumer_id);
return 0;
}
While I could be wrong about this, I can run the above with any number of producers (with a single consumer only, obviously) without encountering NULL pointers. You can easily add some logic to verify the pointers.
I believe you're spinning quite a lot, even in the uncontested case.
I would personally consider using two linked lists instead: one for unused/free slots, and the other for the added entries. (If your pointed-to entries start with a next
pointer field, then you only need the used list. I prefer this, myself.)
Producers always grab first node from free list, and prepend to the used list. The consumer grabs the entire used list. All of these operations use a simple do { } while (!__sync_bool_compare_and_swap());
loop, or do { } while (!__atomic_compare_exchange());
for GCC 4.7 and later, with the loop executing only once. Something similar to the following -- untested -- code:
struct node {
struct node *next;
/* whatever data here */
};
void add_one(volatile struct node **const list, struct node *item)
{
do {
item->next = (*list) ? (*list)->next : NULL;
} while (!__sync_bool_compare_and_swap(list, item->next, item);
}
struct node *get_one(volatile struct node **const list)
{
struct node *first, *next;
do {
first = *list;
next = (first) ? first->next : NULL;
} while (!__sync_bool_compare_and_swap(list, first, next);
if (first)
first->next = NULL;
return first;
}
struct node *get_all(volatile struct node **const list)
{
struct node *all, *root;
do {
all = *list;
} while (!__sync_bool_compare_and_swap(list, all, NULL));
root = NULL;
while (all) {
struct node *const curr = all;
all = all->next;
curr->next = root;
root = curr;
}
return root;
}
Note that above get_all()
reverses the list, so that the oldest entry is first in the returned list. This makes it easy for the consumer to process all entries in the order they were added, with minimal overheads in the common case.
Questions?