Search code examples
c++pthreadssemaphoreproducer-consumer

Multiple producer-consumer problem sticks on last consume


I'm trying solve a multiple producer-consumer problem using pthreads and semaphore but it always sticks at the last consume and halt. It will have NO_ITEMS of items and suppose buffer have size BUFFER_SIZE

This is my current code below.

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <stack>
#define BUFFER_SIZE 50
#define NO_ITEMS 100

using namespace std;

void* thread_producer(void* args);
void* thread_consumer(void* args);
void addItem(int i);
void removeItem();

sem_t fillCount;
sem_t emptyCount;
pthread_mutex_t mutex;

stack<int> items;
static int count = 0;



int main()
{
    sem_init(&fillCount, 0, 0);
    sem_init(&emptyCount, 0, BUFFER_SIZE);
    pthread_mutex_init(&mutex, nullptr);
    pthread_t p1, c1, c2, c3;

    pthread_create(&p1, nullptr, thread_producer, nullptr);
    pthread_create(&c1, nullptr, thread_consumer, nullptr);
    pthread_create(&c2, nullptr, thread_consumer, nullptr);
    pthread_create(&c3, nullptr, thread_consumer, nullptr);

    pthread_join(p1, nullptr);
    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);

    sem_destroy(&fillCount);
    sem_destroy(&emptyCount);
    pthread_mutex_destroy(&mutex);

    return 0;
}

void* thread_consumer(void* args) {

    while (count < NO_ITEMS) {
        sem_wait(&fillCount);
        pthread_mutex_lock(&mutex);

        if (!items.empty() && count < NO_ITEMS - 1) {
            removeItem();
        }

        count++;
        pthread_mutex_unlock(&mutex);
        sem_post(&emptyCount);
    }

    return nullptr;
}

void* thread_producer(void* args) {
    for (int i = 0; i < NO_ITEMS; i++) {
        sem_wait(&emptyCount);
        pthread_mutex_lock(&mutex);

        addItem(i);
        // sleep(1);

        pthread_mutex_unlock(&mutex);
        sem_post(&fillCount);
    }

    return nullptr;

}

void addItem(int i) {
    cout << "Produced: " << i << endl;
    items.push(i);
}

void removeItem() {
    cout << "Consumed: " << items.top() << endl;
    items.pop();

}

This is the part of the output:

Consumed: 0
Produced: 96
Consumed: 96
Produced: 97
Produced: 98
Consumed: 98
Consumed: 97
Produced: 99 // halt

Solution

  • Flawed logic

    Your code has a logic problem. Suppose NO_ITEMS is 100, and 99 have so far been consumed. Let two consumer threads arrive at the top of the while loop at that point, and suppose that both read count as 99 (but see below), and therefore enter the body of the loop. Both consumers will block on sem_wait(), but there is at most one more item to be produced, so the producer will increment the semaphore at most once more, leaving at least one of the consumers blocked indefinitely.

    Undefined behavior

    Moreover, your thread_consumer() function contains a data race, leaving your program's behavior undefined. Specifically, the read of shared variable count in the while condition is not properly synchronized. Although one cannot reliably predict how UB will manifest (else it would not be "undefined"), it is fairly common for unsynchronized accesses to manifest apparent failures of one thread to see shared-variable updates of other threads. Such a failure mode would explain your particular observed behavior all by itself.

    Very likely, a correct fix to this synchronization problem would also fix the logic problem.

    Solutions

    There are multiple possible solutions. Here are some promising ones:

    1. Semaphores are not a particularly comfortable fit for the problem. You need a mutex anyway, and its usual counterpart for signaling is a condition variable. I would convert the two semaphores to two (or maybe just one) ordinary integer variable, and use a standard mutex + CV pattern in both producer and consumer. That would include adding mutex protection for the read of count in the consumer.

    2. On the other hand, if you are obligated to use semaphores, then you could

      • add appropriate mutex protection for the consumers' read of count
      • be sure to retain the consumers' test for whether they can actually consume an item after successfully decrementing the semaphore
      • have the main program post twice (number of consumers - 1 times) to fillCount after joining the producer thread but before attempting to join the consumers. This will unblock any consumers that thought they would be able to consume an item but end up still waiting after the last item is consumed by another consumer.
    3. Or you could employ a hybrid: retain the emptyCount semaphore to limit the number of items waiting at any given time (instead of switching to a CV for that purpose), but switch to a mutex + CV pattern for managing the consumers.