Search code examples
cmultithreadingconcurrencypthreadsproducer-consumer

Linux pthread Producers and Consumers


I have to write a POSIX threads and semaphores based implementation of Producers and Consumers Problem for academical purposes. To check if the implementation is valid I sum all the 'goods' which are produced and all which are consumed. The problem is that second sums vary between subsequent executions of program and are not always equal to count of produced goods. I use cyclic buffer of fixed size for holding produced values, 2 semaphores for letting producers and consumers get into the critical section and 2 mutexes for accessing producers' and consumers' indexes. Here's my code:

#include<unistd.h>
#include<pthread.h>
#include<semaphore.h>
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<time.h>
#define PRODUCERS_COUNT 10
#define CONSUMERS_COUNT 5
#define BUFFER_SIZE 2
#define ITERATIONS 1000
int buffer[BUFFER_SIZE];
pthread_mutex_t write_index_mutex;
pthread_mutex_t read_index_mutex;

sem_t producer_semaphore;
sem_t consumer_semaphore;

int write_index = 0;
int read_index = 0;

int total_produced_sum = 0;
int total_consumed_sum = 0;

void* producer_thread(void* args)
{
    int producer_id = *((int*)args);
    free(args);
    int my_write_index;
    int iterations = ITERATIONS;
    while(iterations--)
    {
        sem_wait(&producer_semaphore);
        pthread_mutex_lock(&write_index_mutex);
        my_write_index = write_index;
        write_index = (write_index + 1) % BUFFER_SIZE;
        total_produced_sum += producer_id;
        pthread_mutex_unlock(&write_index_mutex);
        buffer[my_write_index] = producer_id;
        sem_post(&consumer_semaphore);
        usleep((rand() % 10)); 
    }
    return NULL;
}

void* consumer_thread(void* args)
{
    int my_read_index;
    while(1)
    {
        sem_wait(&consumer_semaphore);
        pthread_mutex_lock(&read_index_mutex);
        my_read_index = read_index;
        read_index = (read_index + 1) % BUFFER_SIZE;
        total_consumed_sum += buffer[my_read_index];
        pthread_mutex_unlock(&read_index_mutex);
        sem_post(&producer_semaphore);
        usleep((rand() % 10));
    }
    return NULL;
}

int main()
{
    int i;
    int *id;
    pthread_t producers[PRODUCERS_COUNT];
    pthread_t consumers[CONSUMERS_COUNT];
    sem_init(&producer_semaphore, 0, BUFFER_SIZE);
    sem_init(&consumer_semaphore, 0, 0);
    pthread_mutex_init(&write_index_mutex, NULL);
    pthread_mutex_init(&read_index_mutex, NULL);
    for(i = 0 ; i < PRODUCERS_COUNT ; i++)
    {
        id = (int*)malloc(sizeof(int));
        *id = i+1;
        pthread_create(&producers[i], 0, producer_thread, (void*)id);
    }
    for(i = 0; i < CONSUMERS_COUNT; i++)
    {
        pthread_create(&consumers[i], 0, consumer_thread, NULL);
    }
    for(i = 0; i < PRODUCERS_COUNT; i++)
    {
        pthread_join(producers[i], NULL);
    }
    while(1)
    {
        sem_getvalue(&consumer_semaphore, &i);
        if(i == 0)
            break;
    }
    printf("Goods produced: %d goods consumed: %d\n", total_produced_sum, total_consumed_sum);
    return 0;
}

And here is some sample output for 10 runs of the same program, without re-compilation:

Goods produced: 55000 goods consumed: 54996
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 54998
Goods produced: 55000 goods consumed: 55003
Goods produced: 55000 goods consumed: 54998
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55008
Goods produced: 55000 goods consumed: 54999
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55000

Is there any logical or implementation error which causes these sums to be unequal?


Solution

  • Incorrect producer

    Consider your producer:

        sem_wait(&producer_semaphore);
        pthread_mutex_lock(&write_index_mutex);
        my_write_index = write_index;
        write_index = (write_index + 1) % BUFFER_SIZE;
        total_produced_sum += producer_id;
        pthread_mutex_unlock(&write_index_mutex);
    
        // Producer #1 stops right here.
    
        buffer[my_write_index] = producer_id;
        sem_post(&consumer_semaphore);
    

    Let's suppose the following sequence happens:

    1. Producer #1 runs until the comment above and then stops. Suppose its my_write_index is 0. It has already incremented write_index to 1, but has not yet written anything to buffer[0].
    2. Now, producer #2 runs through the entire code. It has my_write_index of 1, writes its id to buffer[1], and then posts to consumer_semaphore.
    3. Next, consumer #1 acquires consumer_semaphore since it was posted by producer #2. It tries to consume an element but the next read_index is 0. Unfortunately, buffer[0] has not yet been written to by producer #1 and has whatever unknown value was left over from before.

    To fix your code, you need to modify the producer to write to the buffer before releasing the mutex. In other words, switch the order of these two lines:

        pthread_mutex_unlock(&write_index_mutex);
        buffer[my_write_index] = producer_id;