Working on the Producer and Consumer problem for a class and having trouble just putting on the final touches. The problem I am encountering is that i think my mutex locks are not locking my threads out of the function. For example if I run the program and pass it the parameters 2 4 4 7 it would print 8 7's and then 2 seconds later it will print 8 8's and then 8 9's and so on. I have tried using trylock and moving around the semaphores but to no avail. Is there something that I am missing when that is causing none of the threads to be locked out?
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
typedef int buffer_item;
#define BUFFER_SIZE 5
#define TRUE 1
buffer_item START_NUMBER;
int counter;
int insert_item(buffer_item item);
int remove_item(buffer_item *item);
buffer_item buffer[BUFFER_SIZE];
void* producer(void *ptr);
void* consumer(void *ptr);
pthread_cond_t condc, condp;
pthread_mutex_t mutex;
int sleepTime, producerThreads, consumerThreads,a;
pthread_attr_t attr;
sem_t full, empty;
int insert_item(buffer_item item)
{
if(counter < BUFFER_SIZE) {
buffer[counter] = item;
counter++;
return 0;
}
else {
return -1;
}
}
int remove_item(buffer_item *item)
{
if(counter > 0) {
*item = buffer[(counter-1)];
counter--;
return 0;
}
else {
return -1;
}
}
void* producer(void *ptr) {
buffer_item item;
item = START_NUMBER;
while(TRUE) {
sleep(sleepTime);
sem_wait(&empty);
pthread_mutex_lock(&mutex);
if(insert_item(item)) {
fprintf(stderr, "error \n");
}
else {
printf("producer%u produced %d\n", (unsigned int)pthread_self(),item);
item++;
}
pthread_mutex_unlock(&mutex);
sem_post(&full);
}
}
void* consumer(void *ptr) {
buffer_item item;
while(TRUE) {
sleep(sleepTime);
sem_wait(&full);
pthread_mutex_lock(&mutex);
if(remove_item(&item)) {
fprintf(stderr, "error \n");
}
else {
printf("consumer%u consumed %d\n", (unsigned int)pthread_self(),item);
}
pthread_mutex_unlock(&mutex);
sem_post(&empty);
}
}
void initializeData() {
pthread_mutex_init(&mutex, NULL);
sem_init(&full, 0, 0);
sem_init(&empty, 0, BUFFER_SIZE);
pthread_attr_init(&attr);
counter = 0;
}
int main(int argc, char **argv) {
sleepTime = atoi(argv[1]);
producerThreads = atoi(argv[2]);
consumerThreads = atoi(argv[3]);
START_NUMBER = atoi(argv[4]);
initializeData();
pthread_t pro, con;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&condc, NULL);
pthread_cond_init(&condp, NULL);
for(a=0; a< consumerThreads;a++)
pthread_create(&con, NULL, consumer, NULL);
for(a=0;a<producerThreads;a++)
pthread_create(&pro, NULL, producer, NULL);
pthread_join(con, NULL);
pthread_join(pro, NULL);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&condc);
pthread_cond_destroy(&condp);
sleep(sleepTime);
}
Thanks for the help Dmitri, after moving around the lines you told me about and some discussion with one of my friends I finally got it to start outputting the right numbers!
This is the output i have been looking for
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
typedef int buffer_item;
#define BUFFER_SIZE 5
#define TRUE 1
buffer_item START_NUMBER;
buffer_item item;
int counter;
int insert_item(buffer_item item);
int remove_item(buffer_item *item);
buffer_item buffer[BUFFER_SIZE];
void* producer(void *ptr);
void* consumer(void *ptr);
pthread_cond_t condc, condp;
pthread_mutex_t mutex;
int sleepTime, producerThreads, consumerThreads, a, item;
pthread_attr_t attr;
sem_t full, empty;
int insert_item(buffer_item item)
{
if(counter < BUFFER_SIZE) {
buffer[counter] = item;
counter++;
return 0;
}
else {
return -1;
}
}
int remove_item(buffer_item *item)
{
if(counter > 0) {
*item = buffer[(counter-1)];
printf("consumer%u consumed %d\n", (unsigned int)pthread_self(),buffer[counter-1]);
counter--;
return 0;
}
else {
return -1;
}
}
void* producer(void *ptr) {
while(TRUE) {
sleep(sleepTime);
sem_wait(&empty);
pthread_mutex_lock(&mutex);
if(insert_item(START_NUMBER)) {
fprintf(stderr, "error \n");
}
else {
printf("producer%u produced %d\n", (unsigned int)pthread_self(),START_NUMBER);
START_NUMBER++;
}
pthread_mutex_unlock(&mutex);
sem_post(&full);
}
}
void* consumer(void *ptr) {
while(TRUE) {
sleep(sleepTime);
sem_wait(&full);
pthread_mutex_lock(&mutex);
if(remove_item(&item)) {
fprintf(stderr, "error \n");
}
else {
// printf("consumer%u consumed %d\n", (unsigned int)pthread_self(),&START_NUMBER);
}
pthread_mutex_unlock(&mutex);
sem_post(&empty);
}
}
void initializeData() {
pthread_mutex_init(&mutex, NULL);
sem_init(&full, 0, 0);
sem_init(&empty, 0, BUFFER_SIZE);
pthread_attr_init(&attr);
counter = 0;
}
int main(int argc, char **argv) {
sleepTime = atoi(argv[1]);
producerThreads = atoi(argv[2]);
consumerThreads = atoi(argv[3]);
START_NUMBER = atoi(argv[4]);
item = START_NUMBER;
initializeData();
pthread_t pro, con;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&condc, NULL);
pthread_cond_init(&condp, NULL);
for(a=0; a< consumerThreads;a++)
pthread_create(&con, NULL, consumer, NULL);
for(a=0;a<producerThreads;a++)
pthread_create(&pro, NULL, producer, NULL);
pthread_join(con, NULL);
pthread_join(pro, NULL);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&condc);
pthread_cond_destroy(&condp);
sleep(sleepTime);
}