Search code examples
cqueuedeadlockfifo

FIFO implementation, deadlock


#include <math.h>
#include <pthread.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define MIN(x, y)         ((x) < (y) ? (x) : (y))

#define TASK_LENGHT pow(10, 5) /* 0.1s */
#define TASK_COUNT 100
#define THREADS_CAP 1

struct fifo {
    size_t out;
    size_t in;
    size_t size;
    size_t len;
    pthread_spinlock_t lock;
    char data[1];
};

struct fifo *
fifo_alloc(size_t size)
{
    struct fifo *fifo;

    fifo = malloc(offsetof(struct fifo, data) + size);
    fifo->out = fifo->in = 0;
    fifo->size = size;
    fifo->len = 0;
    pthread_spin_init(&fifo->lock, PTHREAD_PROCESS_PRIVATE);
    return fifo;
}

void
fifo_free(struct fifo *fifo)
{
    pthread_spin_destroy(&fifo->lock);
    free(fifo);
}

size_t
fifo_in(struct fifo *fifo, const void *data, size_t len)
{
    size_t total, remaining, l;

    pthread_spin_lock(&fifo->lock);
    total = remaining = MIN(len, fifo->size - fifo->len);
    while (remaining > 0) {
        l = MIN(remaining, fifo->size - fifo->in);
        memcpy(fifo->data + fifo->in, data, l);
        fifo->in = (fifo->in + l) % fifo->size;
        fifo->len += l;
        data = (char *)data + l;
        remaining -= l;
    }
    pthread_spin_unlock(&fifo->lock);
    return total;
}

size_t
fifo_out(struct fifo *fifo, void *data, size_t len)
{
    size_t total, remaining, out, l;

    pthread_spin_lock(&fifo->lock);
    total = remaining = MIN(len, fifo->len);
    while (remaining > 0) {
        l = MIN(remaining, fifo->size - fifo->out);
        memcpy(data, fifo->data + fifo->out, l);
        fifo->out = (fifo->out + l) % fifo->size;
        data = (char *)data + l;
        remaining -= l;
    }
    fifo->len -= total;
    pthread_spin_unlock(&fifo->lock);
    return total;
}

int
fifo_is_empty(struct fifo *fifo)
{
    int empty;

    pthread_spin_lock(&fifo->lock);
    empty = fifo->len == 0;
    pthread_spin_unlock(&fifo->lock);
    return empty;
}

void *
process_task(void *datap)
{
    struct fifo *tasks = datap;
    int data;

    while (1) {
        puts("it blocks there");
        if (!fifo_is_empty(tasks)) {
            puts("this code never execute");
            fifo_out(tasks, &data, sizeof(data));
            if (data == -1) pthread_exit(0);
            printf("%lu => %d\n", pthread_self(), data);
            usleep(TASK_LENGHT);
        }
        puts("this code never execute");
    }
}

int
main(void) {
    pthread_t threads[THREADS_CAP];
    struct fifo *tasks;
    int data;
    size_t i;

    tasks = fifo_alloc(128);
    for (i = 0; i < THREADS_CAP; i++)
        pthread_create(threads + i, NULL, process_task, &tasks);

    for (i = 0; i < TASK_COUNT; i++)
        fifo_in(tasks, &i, sizeof(i));

    for (i = 0; i < THREADS_CAP; i++) {
        data = -1;
        fifo_in(tasks, &data, sizeof(i));
    }

    puts("wait for threads to process all tasks");
    for (i = 0; i < THREADS_CAP; i++) {
        pthread_join(threads[i], NULL);
    }

    fifo_free(tasks);
}

I debugged my program and it turned out that it blocks at the pthread_spin_lock in fifo_is_empty. I was trying to recreate this problem with the minimum code needed like this:

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

#define THREADS_CAP 10000

struct test {
    pthread_spinlock_t lock;
};

pthread_barrier_t barrier;
int counter = 0;


void *
foo(void *data) {
    struct test *test = data;

    pthread_barrier_wait(&barrier);
    pthread_spin_lock(&test->lock);
    counter++;
    pthread_spin_unlock(&test->lock);
    return 0;
}

int main() {
    struct test *test;
    pthread_t threads[THREADS_CAP];
    size_t i;

    test = malloc(sizeof(test));
    pthread_barrier_init(&barrier, NULL, THREADS_CAP);
    pthread_spin_init(&test->lock, PTHREAD_PROCESS_PRIVATE);
    for (i = 0; i < THREADS_CAP; i++)
        pthread_create(threads + i, NULL, foo, test);

    for (i = 0; i < THREADS_CAP; i++)
        pthread_join(threads[i], NULL);
    printf("%d\n", counter);

    pthread_spin_destroy(&test->lock);
    pthread_barrier_destroy(&barrier);
    return 0;
}

but i was not able to. I just works for some reason in contrast to my previous code. Does anyone have an idea what is causing there problem and how to fix it?


Solution

  • The problem took me a while to sort out, but I arrived there through working on a minimal reproducible example. I ultimately cut out everything related to managing the FIFO data, some unneeded functions, and all of the looping, but I found that the failure behavior did not manifest if I changed the definition of struct fifo.

    Finally, I realized that the problem is here:

        struct fifo *tasks;
    

    // ...

            pthread_create(threads + i, NULL, process_task, &tasks);
    

    in conjunction with

    void *process_task(void *datap) {
        struct fifo *tasks = datap;
    

    Bottom line: you are passing the address of your tasks pointer, a pointer to a pointer, but then your thread function tries to interpret it as a single pointer. Undefined behavior results.

    The correction is to pass tasks itself to the thread, not &tasks:

            pthread_create(threads + i, NULL, process_task, tasks);
    

    Your program has other problems, but that resolves the one you asked about.