I have to write a POSIX threads and semaphores based implementation of Producers and Consumers Problem for academical purposes. To check if the implementation is valid I sum all the 'goods' which are produced and all which are consumed. The problem is that second sums vary between subsequent executions of program and are not always equal to count of produced goods. I use cyclic buffer of fixed size for holding produced values, 2 semaphores for letting producers and consumers get into the critical section and 2 mutexes for accessing producers' and consumers' indexes. Here's my code:
#include<unistd.h>
#include<pthread.h>
#include<semaphore.h>
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<time.h>
#define PRODUCERS_COUNT 10
#define CONSUMERS_COUNT 5
#define BUFFER_SIZE 2
#define ITERATIONS 1000
int buffer[BUFFER_SIZE];
pthread_mutex_t write_index_mutex;
pthread_mutex_t read_index_mutex;
sem_t producer_semaphore;
sem_t consumer_semaphore;
int write_index = 0;
int read_index = 0;
int total_produced_sum = 0;
int total_consumed_sum = 0;
void* producer_thread(void* args)
{
int producer_id = *((int*)args);
free(args);
int my_write_index;
int iterations = ITERATIONS;
while(iterations--)
{
sem_wait(&producer_semaphore);
pthread_mutex_lock(&write_index_mutex);
my_write_index = write_index;
write_index = (write_index + 1) % BUFFER_SIZE;
total_produced_sum += producer_id;
pthread_mutex_unlock(&write_index_mutex);
buffer[my_write_index] = producer_id;
sem_post(&consumer_semaphore);
usleep((rand() % 10));
}
return NULL;
}
void* consumer_thread(void* args)
{
int my_read_index;
while(1)
{
sem_wait(&consumer_semaphore);
pthread_mutex_lock(&read_index_mutex);
my_read_index = read_index;
read_index = (read_index + 1) % BUFFER_SIZE;
total_consumed_sum += buffer[my_read_index];
pthread_mutex_unlock(&read_index_mutex);
sem_post(&producer_semaphore);
usleep((rand() % 10));
}
return NULL;
}
int main()
{
int i;
int *id;
pthread_t producers[PRODUCERS_COUNT];
pthread_t consumers[CONSUMERS_COUNT];
sem_init(&producer_semaphore, 0, BUFFER_SIZE);
sem_init(&consumer_semaphore, 0, 0);
pthread_mutex_init(&write_index_mutex, NULL);
pthread_mutex_init(&read_index_mutex, NULL);
for(i = 0 ; i < PRODUCERS_COUNT ; i++)
{
id = (int*)malloc(sizeof(int));
*id = i+1;
pthread_create(&producers[i], 0, producer_thread, (void*)id);
}
for(i = 0; i < CONSUMERS_COUNT; i++)
{
pthread_create(&consumers[i], 0, consumer_thread, NULL);
}
for(i = 0; i < PRODUCERS_COUNT; i++)
{
pthread_join(producers[i], NULL);
}
while(1)
{
sem_getvalue(&consumer_semaphore, &i);
if(i == 0)
break;
}
printf("Goods produced: %d goods consumed: %d\n", total_produced_sum, total_consumed_sum);
return 0;
}
And here is some sample output for 10 runs of the same program, without re-compilation:
Goods produced: 55000 goods consumed: 54996
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 54998
Goods produced: 55000 goods consumed: 55003
Goods produced: 55000 goods consumed: 54998
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55008
Goods produced: 55000 goods consumed: 54999
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55000
Is there any logical or implementation error which causes these sums to be unequal?
Consider your producer:
sem_wait(&producer_semaphore);
pthread_mutex_lock(&write_index_mutex);
my_write_index = write_index;
write_index = (write_index + 1) % BUFFER_SIZE;
total_produced_sum += producer_id;
pthread_mutex_unlock(&write_index_mutex);
// Producer #1 stops right here.
buffer[my_write_index] = producer_id;
sem_post(&consumer_semaphore);
Let's suppose the following sequence happens:
my_write_index
is 0. It has already incremented write_index
to 1, but has not yet written anything to buffer[0]
.my_write_index
of 1, writes its id to buffer[1]
, and then posts to consumer_semaphore
.consumer_semaphore
since it was posted by producer #2. It tries to consume an element but the next read_index
is 0. Unfortunately, buffer[0]
has not yet been written to by producer #1 and has whatever unknown value was left over from before.To fix your code, you need to modify the producer to write to the buffer before releasing the mutex. In other words, switch the order of these two lines:
pthread_mutex_unlock(&write_index_mutex);
buffer[my_write_index] = producer_id;