Search code examples
cmultithreadingatomicproducer-consumerlock-free

Why do my consumer threads stop before my producer threads are done?


I have recently written a bounded lock free queue and was making some tests for it. In the test, some threads produce prime numbers (by starting at some number, counting up by 6 times the number of pairs of producer threads, checking every number using a Deterministic Miller Rabin test, and inserting the primes into a queue) and some threads consume prime numbers (by removing elements from the queue and checking if they are prime). The producer threads come in pairs with one in each pair producing prime numbers equal to 1 mod 6 and the other producing prime numbers equal to 5 mod 6 (all numbers equal to 0, 2, 3, or 4 mod 6 are composite besides 2 and 3), the main thread produces 2 and 3. There is a global counter of how many threads are not done producing. Every time a producer thread or main finishes generating prime numbers it atomically decrements this counter. The consumer threads loop while it is not 0.

To determine whether or not the prime numbers actually make it through the queue, I compute the 0th through 3rd moments of the prime numbers produced and consumed by every thread and check the sums of the moments for the producer threads equal the sums of the moments for the consumer threads. The nth moment is just the sum of the nth powers, so this means the number of primes, their sum, the sum of their squares, and the sum of their cubes, all match. All the moments will match if the sequences are permutations of each other, so while I would need to check the first n to ensure sequences of length n are actually permutations, the first 4 matching means the chance of the sequences not matching is incredibly small.

My lock free queue actually works, but for some reason the consumer threads all stop while there are still elements in the queue. I don't understand why because the producer threads only decrement the producing counter after they have inserted all of their primes into the queue, and the producing counter can only equal 0 after all of the producing threads have decremented it. Thus, whenever the producing counter is 0 all of the elements have been inserted into the queue. But then if a consumer attempts to remove an element it should succeed because remove only fails if queue.full (the number of elements in the queue) is 0. So when the producing counter is 0, the consumers should be able to successfully consume until queue.full is 0 and should not check the producing counter and return until the queue has been exhausted. They only check the producing counter if a remove fails (in case the consumers are faster than the producers and empty the queue).

However, when I make the while loop around remove check queue.full in addition to the producing counter, the consumers do not return early. That is, when I change

__atomic_load_n(&producing, __ATOMIC_SEQ_CST)

to

__atomic_load_n(&producing, __ATOMIC_SEQ_CST) || __atomic_load_n(&Q.full, __ATOMIC_SEQ_CST)

it just works. Note that my code uses a reasonable amount of gcc extensions such as attributes, __atomic builtins, __auto_type, statement expressions, 128 bit integers, __builtin_ctzll, and '\e', C99 features such as designated initializers and compound literals, and pthreads. I'm also using the sequentially consistent memory order and strong compare and swap everywhere even though weaker versions should work because I don't want problems from that while I have problems from this. Here is the header queue.h:

#ifndef __QUEUE_H__
#define __QUEUE_H__

#include <stddef.h>
#include <inttypes.h>

typedef struct __attribute__((__designated_init__)){//using positional initializers for a struct is terrible
    void *buf;
    uint8_t *flags;//insert started, insert complete, remove started
    size_t cap, full;
    uint64_t a, b;
} queue_t;

typedef struct __attribute__((__designated_init__)){
    size_t size;
} queue_ft;//this struct serves as a class for queue objects: any data specific to the object goes in the queue_t struct and any shared data goes here

int queue_insert(queue_t*, const queue_ft*, void *elem);

int queue_remove(queue_t*, const queue_ft*, void *out);

int queue_init(queue_t*, const queue_ft*, size_t reserve);

void queue_destroy(queue_t*, const queue_ft*);

#endif

Here is the library source queue.c:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include "queue.h"

int queue_insert(queue_t *self, const queue_ft *ft, void *elem){
    uint64_t i;
    while(1){
        uint8_t flag = 0;
        if(__atomic_load_n(&self->full, __ATOMIC_SEQ_CST) == self->cap){
            return 0;
        }
        i = __atomic_load_n(&self->b, __ATOMIC_SEQ_CST);
        if(__atomic_compare_exchange_n(self->flags + i, &flag, 0x80, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)){//set the insert started flag if all flags are clear
            break;
        }
    }
    __atomic_fetch_add(&self->full, 1, __ATOMIC_SEQ_CST);
    uint64_t b = i;
    while(!__atomic_compare_exchange_n(&self->b, &b, (b + 1)%self->cap, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));//increase the b endpoint of the queue with wraparaound
    memcpy(self->buf + i*ft->size, elem, ft->size);//actually insert the item.  accesses to the buffer mirror accesses to the flags so this is safe
    __atomic_thread_fence(memory_order_seq_cst);
    __atomic_store_n(self->flags + i, 0xc0, __ATOMIC_SEQ_CST);//set the insert completed flag
    return 1;
}

int queue_remove(queue_t *self, const queue_ft *ft, void *out){
    uint64_t i;
    while(1){
        uint8_t flag = 0xc0;
        if(!__atomic_load_n(&self->full, __ATOMIC_SEQ_CST)){
            return 0;
        }
        i = __atomic_load_n(&self->a, __ATOMIC_SEQ_CST);
        if(__atomic_compare_exchange_n(self->flags + i, &flag, 0xe0, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)){//set the remove started flag if insert started and insert completed are set but the other flags are clear
            break;
        }
    }
    __atomic_fetch_sub(&self->full, 1, __ATOMIC_SEQ_CST);
    uint64_t a = i;
    while(!__atomic_compare_exchange_n(&self->a, &a, (a + 1)%self->cap, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));//increase the a endpoint of the queue with wraparound
    memcpy(out, self->buf + i*ft->size, ft->size);//actually remove the item.
    __atomic_thread_fence(__ATOMIC_SEQ_CST);
    __atomic_store_n(self->flags + i, 0x00, __ATOMIC_SEQ_CST);//clear all the flags to mark the remove as completed
    return 1;
}

int queue_init(queue_t *self, const queue_ft *ft, size_t reserve){
    void *buf = malloc(reserve*ft->size);
    if(!buf){
        return 0;
    }
    uint8_t *flags = calloc(reserve, sizeof(uint8_t));
    if(!flags){
        free(buf);
        return 0;
    }
    *self = (queue_t){
        .buf=buf,
        .flags=flags,
        .cap=reserve,
        .full=0,
        .a=0,.b=0
    };
    return 1;
}

void queue_destroy(queue_t *self, const queue_ft *ft){
    free(self->buf);
    free(self->flags);
}

And here is the test program source test_queue_pc.c:

#define _POSIX_C_SOURCE 201612UL

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include <pthread.h>
#include <math.h>
#include <time.h>
#include "queue.h"

//Generate primes up to this number.  Note 78498 is the number of primes below 1000000; this is hard coded because the queue does not support growing yet.
#define MAX 1000000
#define QUEUE_SIZE 78498
#define NUM_PRODUCER_PAIRS 3
#define NUM_CONSUMERS 2
//Every producer and consumer thread calculates the 0th through 3rd moments of the sequence of primes it sees, as well as testing them for primality.
//The nth moment is the sum of the nth powers, thus, the order does not matter and if the primes are the same in both the producers and the consumers
//then the sums of the moments will also be the same.  I check that the 0th through 3rd moments match which means it is nearly certain the primes go through
//the queue.
#define NUM_MOMENTS 4

//Deterministic Miller Rabin witnesses (see https://en.wikipedia.org/wiki/Miller–Rabin_primality_test)
#define DMR_PRIMES (uint64_t[]){2, 13, 23, 1662803}
#define DMR_PRIMES_C 4

//Macro to split an integer into three parts.  The first part has the 2**0, 2**3, 2**6, ..., 2**60 bits of the original and 0 elsewhere.
//The second part has the 2**1, 2**4, 2**7, ..., 2**61 bits of the original and 0 elsewhere.  The last part has the 2**2, ..., 2**62 bits.
//The 2**63 bit is lost.  The masks represent the sums of geometric sequences.  The original number can be obtained by bitwise or or xor on the parts.
//I spread the uint64_t's (which are unsigned long longs) over 3 uint64_t's so that they take up 24 bytes and memcpy'ing them happens in multiple steps.
//This spreading is only done on primes that have been produced before they are put into the queue.  The consumers then recombine and verify them.
#define SPREAD_EMPLACE(n) ({__auto_type _n = (n); &(spread_integer){(_n)&(((1ULL<<60)-1)/7), (_n)&(((1ULL<<61)-2)/7), (_n)&(((1ULL<<62)-4)/7)};})

typedef struct{
    uint64_t x, y, z;
} spread_integer;

queue_ft spread_integer_ft = {.size= sizeof(spread_integer)};

queue_t Q;
//Start producing count at 1 + (NUM_PRODUCING_THREADS << 1) because main generates 2 and 3 and reduce it by 1 every time a producer thread finishes
int producing = 1 + (NUM_PRODUCER_PAIRS << 1);

//Uses the binary algorithm for modular exponentiation (https://en.wikipedia.org/wiki/Exponentiation_by_squaring)
//It is a helper function for isPrime
uint64_t powmod(unsigned __int128 b, uint64_t e, uint64_t n){
    unsigned __int128 r = 1;
    b %= n;
    while(e){
        if(e&1){
            r = r*b%n;
        }
        e >>= 1;
        b = b*b%n;
    }
    return (uint64_t)r;
}

//uses deterministic Miller Rabin primality test
int isPrime(uint64_t n){
    uint64_t s, d;//s, d | 2^s*d = n - 1
    if(n%2 == 0){
        return n == 2;
    }
    --n;
    s = __builtin_ctzll(n);
    d = n>>s;
    ++n;
    for(uint64_t i = 0, a, x; i < DMR_PRIMES_C; ++i){
        a = DMR_PRIMES[i];
        if(a >= n){
            break;
        }
        x = powmod(a, d, n);
        if(x == 1 || x == n - 1){
            goto CONTINUE_WITNESSLOOP;
        }
        for(a = 0; a < s - 1; ++a){
            x = powmod(x, 2, n);
            if(x == 1){
                return 0;
            }
            if(x == n - 1){
                goto CONTINUE_WITNESSLOOP;
            }
        }
        return 0;
        CONTINUE_WITNESSLOOP:;
    }
    return 1;
}

void *produce(void *_moments){
    uint64_t *moments = _moments, n = *moments;//the output argument for the 0th moment serves as the input argument for the number to start checking for primes at
    *moments = 0;
    for(; n < MAX; n += 6*NUM_PRODUCER_PAIRS){//the producers are paired so one of every pair generates primes equal to 1 mod 6 and the other equal to 5 mod 6.  main generates 2 and 3 the only exceptions
        if(isPrime(n)){
            for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
                moments[i] += m;
            }
            if(!queue_insert(&Q, &spread_integer_ft, SPREAD_EMPLACE(n))){
                fprintf(stderr, "\e[1;31mERROR: Could not insert into queue.\e[0m\n");
                exit(EXIT_FAILURE);
            }
        }
    }
    __atomic_fetch_sub(&producing, 1, __ATOMIC_SEQ_CST);//this thread is done generating primes; reduce producing counter by 1
    return moments;
}

void *consume(void *_moments){
    uint64_t *moments = _moments;
    while(__atomic_load_n(&producing, __ATOMIC_SEQ_CST) || __atomic_load_n(&Q.full, __ATOMIC_SEQ_CST)){//busy loop while some threads are producing
        spread_integer xyz;
        if(queue_remove(&Q, &spread_integer_ft, &xyz)){
            uint64_t n = xyz.x | xyz.y | xyz.z;
            if(isPrime(n)){
                for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
                    moments[i] += m;
                }
            }else{
                fprintf(stderr, "\e[1;31mERROR: Generated a prime that fails deterministic Miller Rabin.\e[0m\n");
                exit(EXIT_FAILURE);
            }
        }
    }
    return moments;
}

int main(void){
    if(!queue_init(&Q, &spread_integer_ft, QUEUE_SIZE)){
        fprintf(stderr, "\e[1;31mERROR: Could not initialize queue.\e[0m\n");
        exit(EXIT_FAILURE);
    }
    pthread_t producers[NUM_PRODUCER_PAIRS << 1], consumers[NUM_CONSUMERS];
    uint64_t moments[(NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS + 1][NUM_MOMENTS] = {};//the 2 extras are because main produces the primes 2 and 3 and consumes primes the consumers leave behind
    for(size_t i = 0; i < NUM_CONSUMERS; ++i){//create consumers first to increase likelihood of causing bugs
        if(pthread_create(consumers + i, NULL, consume, moments[(NUM_PRODUCER_PAIRS << 1) + 1 + i])){
            fprintf(stderr, "\e[1;31mERROR: Could not create consumer thread.\e[0m\n");
            exit(EXIT_FAILURE);
        }
    }
    for(size_t i = 0; i < NUM_PRODUCER_PAIRS; ++i){
        moments[i << 1][0] = 5 + 6*i;
        if(pthread_create(producers + (i << 1), NULL, produce, moments[i << 1])){
            fprintf(stderr, "\e[1;31mERROR: Could not create producer thread.\e[0m\n");
            exit(EXIT_FAILURE);
        }
        moments[(i << 1) + 1][0] = 7 + 6*i;
        if(pthread_create(producers + (i << 1) + 1, NULL, produce, moments[(i << 1) + 1])){
            fprintf(stderr, "\e[1;31mERROR: Could not create producer thread.\e[0m\n");
            exit(EXIT_FAILURE);
        }
    }
    for(uint64_t n = 2; n < 4; ++n){
        for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
            moments[NUM_PRODUCER_PAIRS << 1][i] += m;
        }
        if(!queue_insert(&Q, &spread_integer_ft, SPREAD_EMPLACE(n))){
            fprintf(stderr, "\e[1;31mERROR: Could not insert into queue.\e[0m\n");
            exit(EXIT_FAILURE);
        }
    }
    __atomic_fetch_sub(&producing, 1, __ATOMIC_SEQ_CST);
    uint64_t c = 0;
    for(size_t i = 0; i < NUM_CONSUMERS; ++i){//join consumers first to bait bugs.  Note consumers should not finish until the producing counter reaches 0
        void *_c;
        if(pthread_join(consumers[i], &_c)){
            fprintf(stderr, "\e[1;31mERROR: Could not join consumer thread.\e[0m\n");
            exit(EXIT_FAILURE);
        }
        c += (uintptr_t)_c;
    }
    for(size_t i = 0; i < NUM_PRODUCER_PAIRS << 1; ++i){
        if(pthread_join(producers[i], NULL)){
            fprintf(stderr, "\e[1;31mERROR: Could not join producer thread.\e[0m\n");
            exit(EXIT_FAILURE);
        }
    }
    //this really should not be happening because the consumer threads only return after the producing counter reaches 0,
    //which only happens after all of the producer threads are done inserting items into the queue.
    if(Q.full){
        fprintf(stdout, "\e[1;31mWTF: Q.full != 0\nproducing == %d\e[0m\n", producing);
    }
    while(Q.full){
        spread_integer xyz;
        if(!queue_remove(&Q, &spread_integer_ft, &xyz)){
            fprintf(stderr, "\e[1;31mERROR: Could not remove from non empty queue.\e[0m\n");
            exit(EXIT_FAILURE);
        }
        uint64_t n = xyz.x | xyz.y | xyz.z;
        if(isPrime(n)){
            for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
                moments[(NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS][i] += m;
            }
        }else{
            fprintf(stderr, "\e[1;31mERROR: Generated a prime that fails deterministic Miller Rabin.\e[0m\n");
            exit(EXIT_FAILURE);
        }
    }
    queue_destroy(&Q, &spread_integer_ft);
    for(uint64_t i = 0, p, c, j; i < NUM_MOMENTS; ++i){
        for(j = p = 0; j < (NUM_PRODUCER_PAIRS << 1) + 1; ++j){
            p += moments[j][i];
        }
        for(c = 0; j < (NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS + 1; ++j){
            c += moments[j][i];
        }
        printf("Moment %"PRIu64" %"PRIu64" -> %"PRIu64"\n", i, p, c);
    }
}

I then compile with

gcc -o test_queue_pc queue.c test_queue_pc.c -Wall -std=c99 -g -O0 -pthread -fuse-ld=gold -flto -lm

Why do the consumer threads return before the queue is empty even though they wait for the producers to be done, when they loop just on producing, but do the correct thing when they loop on producing || Q.full?


Solution

  • Why do the consumer threads return before the queue is empty even though they wait for the producers to be done, when they loop just on producing, but do the correct thing when they loop on producing || Q.full?

    Because having no more producers mean no new entries will be added to the queue; it does not mean the queue is already empty.

    Consider the case where the producers are faster than the consumers. They add their stuff to the queue, and exit. At this point, there are items in the queue, but the active producer count is zero. If the consumers were to check only whether there are active producers, they would miss the items already in the queue.


    It is important to note that the check

    if ((active producers) || (items in queue))
    

    is the correct one in C99, here. (The || operator has a sequence point after the left side is evaluated. That is, the right side is never evaluated before the left side.)

    If you only check for active producers, you miss the case where the producers are faster than the consumers, and exit while there are still items in the queue.

    If you only check for items in the queue, you miss the case where the producers are still adding stuff to the queue.

    If you check if the queue is empty first, you open up a race window. After the consumer checking if the queue is empty, but before the consumer checks if there are active producers, a producer could add one or more items to the queue and exit.

    You need to check if there are active producers first. If there are active producers, and the queue is empty now, then the consumer must wait if new items arrive in the queue (until the active producer count drops to zero, or a new item arrives in the queue.) If there are no active producers, the consumer must check if there are items in the queue. No active producers means no new items will appear in the queue, but it does not mean the queue is already empty.