Search code examples
cmultithreadingpthreadsproducer-consumer

Everytime works last consumer (producer consumer problem) in C


I have 10 consumer and 1 producer threads. Producer thread produce a random integer and insert it into buffer. Consumer threads take and remove an item from this buffer and print it.

Everything is works good(I am in, there is no endless loop or blocking stuffs). But I think, there is only one working consumer thread which is 10th(last) consumer thread. The other 9 consumer threads doesn't work. I realize that when I print the consumer id in consumer thread method. Why the other 9 consumer threads doesn't work and what can do for this kind of problem?

Below is my code:

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
typedef int buffer_item;
#define BUFFER_SIZE 5

#define RAND_DIVISOR 100000000
#define TRUE 1

pthread_mutex_t mutex;

sem_t full, empty;

buffer_item buffer[BUFFER_SIZE];

int counter;

pthread_t tid;      
pthread_attr_t attr; 

void *producer(void *param); 
void *consumer(void *param); 

int insert_item(buffer_item item) {

   if(counter < BUFFER_SIZE) {
      buffer[counter] = item;
      counter++;
      return 0;
   }
   else { 
      return -1;
   }
}

int remove_item(buffer_item *item) {
   if(counter > 0) {
      *item = buffer[(counter-1)];
      counter--;
      return 0;
   }
   else {
      return -1;
   }
}

void initializeData() {

   pthread_mutex_init(&mutex, NULL);

   sem_init(&full, 0, 0);

   sem_init(&empty, 0, BUFFER_SIZE);

   pthread_attr_init(&attr);

   counter = 0;
}

void *producer(void *param) {
   buffer_item item;

   while(TRUE) {
      int rNum = rand() / RAND_DIVISOR;
      sleep(1);

      item = rand()%100;

      sem_wait(&empty);
      pthread_mutex_lock(&mutex);

      if(insert_item(item)) {
         fprintf(stderr, " Producer report error condition\n");
      }
      else {
         printf("producer produced %d\n", item);
      }
      pthread_mutex_unlock(&mutex);
      sem_post(&full);
   }
}

void *consumer(void *param) {
   buffer_item item;
   int* consumerID=(int*)param;

   while(TRUE) {
      int rNum = rand() / RAND_DIVISOR;
      sleep(1);

      sem_wait(&full);
      pthread_mutex_lock(&mutex);
      if(remove_item(&item)) {
         fprintf(stderr, "Consumer report error condition\n");
      }
      else {
         printf("consumer %d consumed %d\n" ,*consumerID, item);

      }
      pthread_mutex_unlock(&mutex);
      sem_post(&empty);
   }
}

int main(int argc, char *argv[]) {
   /* Loop counter */
   int i;


   int numProd = 1; /* Number of producer threads */
   int numCons = 10; /* Number of consumer threads */

   /* Initialize the app */
   initializeData();

   /* Create the producer threads */
   for(i = 0; i < numProd; i++) {
      /* Create the thread */
      pthread_create(&tid,&attr,producer,NULL);
    }

   /* Create the consumer threads */
   for(i = 0; i < numCons; i++) {
      /* Create the thread */
      pthread_create(&tid,&attr,consumer,(void*)&i);
   }

   /* Sleep for the specified amount of time in milliseconds */
   sleep(10);

   /* Exit the program */
   printf("Exit the program\n");
   exit(0);
}

My output is :

producer produced 27
consumer 10 consumed 27
producer produced 63
consumer 10 consumed 63
producer produced 26
consumer 10 consumed 26
producer produced 11
consumer 10 consumed 11
producer produced 29
consumer 10 consumed 29
producer produced 62
consumer 10 consumed 62
producer produced 35
consumer 10 consumed 35
producer produced 22
consumer 10 consumed 22
producer produced 67
consumer 10 consumed 67
Exit the program

Solution

  • Shawn has beat me to it, but yes he is correct. See the following implementation:

    /* Consumer Thread */
    void *consumer(void *param) {
        buffer_item item;
        int* consumerID=(int*)param;
    
        printf("consumer %d created\n" ,*consumerID);
    
        while(TRUE) {
            /* sleep for a random period of time */
            int rNum = rand() / RAND_DIVISOR;
            sleep(1);
    
            /* aquire the full lock */
            sem_wait(&full)%100;
            /* aquire the mutex lock */
            pthread_mutex_lock(&mutex);
            if(remove_item(&item)) {
                //fprintf(stderr, "Consumer report error condition: consumer %d item: %d\n", *consumerID, item);
            }
            else {
                printf("consumer %d consumed %d\n" ,*consumerID, item);
    
            }
            /* release the mutex lock */
            pthread_mutex_unlock(&mutex);
            /* signal empty */
            sem_post(&empty);
        }
    }
    
    long taskids[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
    
        /* Create the consumer threads */
        for(i = 0; i < numCons; i++) {
            taskids[i] = i;
            /* Create the thread */
            pthread_create(&tid,&attr,consumer, taskids + i);
        }
    

    Which results in:

    consumer 0 created
    consumer 1 created
    consumer 2 created
    consumer 5 created
    consumer 3 created
    consumer 6 created
    consumer 9 created
    consumer 7 created
    consumer 8 created
    consumer 4 created
    producer produced 65
    consumer 9 consumed 65
    producer produced 57
    consumer 6 consumed 57
    producer produced 33
    consumer 5 consumed 33
    producer produced 57
    consumer 1 consumed 57
    producer produced 3
    consumer 3 consumed 3
    producer produced 81
    consumer 9 consumed 81
    producer produced 1
    consumer 5 consumed 1