I'm trying solve a multiple producer-consumer problem using pthreads and semaphore but it always sticks at the last consume and halt. It will have NO_ITEMS of items and suppose buffer have size BUFFER_SIZE
This is my current code below.
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <stack>
#define BUFFER_SIZE 50
#define NO_ITEMS 100
using namespace std;
void* thread_producer(void* args);
void* thread_consumer(void* args);
void addItem(int i);
void removeItem();
sem_t fillCount;
sem_t emptyCount;
pthread_mutex_t mutex;
stack<int> items;
static int count = 0;
int main()
{
sem_init(&fillCount, 0, 0);
sem_init(&emptyCount, 0, BUFFER_SIZE);
pthread_mutex_init(&mutex, nullptr);
pthread_t p1, c1, c2, c3;
pthread_create(&p1, nullptr, thread_producer, nullptr);
pthread_create(&c1, nullptr, thread_consumer, nullptr);
pthread_create(&c2, nullptr, thread_consumer, nullptr);
pthread_create(&c3, nullptr, thread_consumer, nullptr);
pthread_join(p1, nullptr);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
sem_destroy(&fillCount);
sem_destroy(&emptyCount);
pthread_mutex_destroy(&mutex);
return 0;
}
void* thread_consumer(void* args) {
while (count < NO_ITEMS) {
sem_wait(&fillCount);
pthread_mutex_lock(&mutex);
if (!items.empty() && count < NO_ITEMS - 1) {
removeItem();
}
count++;
pthread_mutex_unlock(&mutex);
sem_post(&emptyCount);
}
return nullptr;
}
void* thread_producer(void* args) {
for (int i = 0; i < NO_ITEMS; i++) {
sem_wait(&emptyCount);
pthread_mutex_lock(&mutex);
addItem(i);
// sleep(1);
pthread_mutex_unlock(&mutex);
sem_post(&fillCount);
}
return nullptr;
}
void addItem(int i) {
cout << "Produced: " << i << endl;
items.push(i);
}
void removeItem() {
cout << "Consumed: " << items.top() << endl;
items.pop();
}
This is the part of the output:
Consumed: 0
Produced: 96
Consumed: 96
Produced: 97
Produced: 98
Consumed: 98
Consumed: 97
Produced: 99 // halt
Your code has a logic problem. Suppose NO_ITEMS
is 100, and 99 have so far been consumed. Let two consumer threads arrive at the top of the while
loop at that point, and suppose that both read count
as 99 (but see below), and therefore enter the body of the loop. Both consumers will block on sem_wait()
, but there is at most one more item to be produced, so the producer will increment the semaphore at most once more, leaving at least one of the consumers blocked indefinitely.
Moreover, your thread_consumer()
function contains a data race, leaving your program's behavior undefined. Specifically, the read of shared variable count
in the while
condition is not properly synchronized. Although one cannot reliably predict how UB will manifest (else it would not be "undefined"), it is fairly common for unsynchronized accesses to manifest apparent failures of one thread to see shared-variable updates of other threads. Such a failure mode would explain your particular observed behavior all by itself.
Very likely, a correct fix to this synchronization problem would also fix the logic problem.
There are multiple possible solutions. Here are some promising ones:
Semaphores are not a particularly comfortable fit for the problem. You need a mutex anyway, and its usual counterpart for signaling is a condition variable. I would convert the two semaphores to two (or maybe just one) ordinary integer variable, and use a standard mutex + CV pattern in both producer and consumer. That would include adding mutex protection for the read of count
in the consumer.
On the other hand, if you are obligated to use semaphores, then you could
count
fillCount
after joining the producer thread but before attempting to join the consumers. This will unblock any consumers that thought they would be able to consume an item but end up still waiting after the last item is consumed by another consumer.Or you could employ a hybrid: retain the emptyCount
semaphore to limit the number of items waiting at any given time (instead of switching to a CV for that purpose), but switch to a mutex + CV pattern for managing the consumers.