I wrote my first multithreaded program and for the most part it works. The shared buffer array is initially populated with -1 indicating to the producer that its empty and needs to be filled. The producer then populates the shared buffer with random values 1 - 10, and the producers take turn filling the buffer. The producer then signals the consumer that it has filled an element of the buffer and to come consume it. There are 120 elements the producer needs to fill and the consumer should each entry. The program works perfectly fine until it gets to item 110. It then freezes and I cant figure out why. How do I fix this?
Here is a snippet of the output.
Item: 85, Consuming value 8, my thread id is: 1216
Item: 86, Consuming value 7, my thread id is: 298320
Signal
Producer thread 231296 and value: 0
Producer thread 297552 and value: 2
Producer thread 298576 and value: 0
Item: 87, Consuming value 9, my thread id is: 297808
Signal
Producer thread 960 and value: 3
Producer thread 298064 and value: 2
Item: 88, Consuming value 3, my thread id is: 231744
Item: 89, Consuming value 7, my thread id is: 298320
Item: 90, Consuming value 3, my thread id is: 1216
Item: 91, Consuming value 7, my thread id is: 298832
Signal
Producer thread 231296 and value: 3
Producer thread 297552 and value: 8
Producer thread 298576 and value: 6
Item: 92, Consuming value 2, my thread id is: 297808
Signal
Producer thread 960 and value: 9
Producer thread 298064 and value: 7
Item: 93, Consuming value 5, my thread id is: 298320
Item: 94, Consuming value 2, my thread id is: 298832
Item: 95, Consuming value 0, my thread id is: 1216
Item: 96, Consuming value 2, my thread id is: 231744
This is my code
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdint.h>
#define THREADS 5
#define ELEMENTS 120
pthread_t tid_producer[THREADS], tid_consumer[THREADS];
int value = 0;
int saveValue = 0;
void *produce(void *arg);
void *consume(void *arg);
int producerCount =0;
int consumerCount = ELEMENTS;
struct {
pthread_mutex_t mutex;
int index;
int value;
int MyShBuff[ELEMENTS];
} add = {PTHREAD_MUTEX_INITIALIZER, 0, 0};
struct{
pthread_mutex_t mutex;
pthread_cond_t cond;
int nready;
int value;
int empty;
int counter;
/* number ready for consumer */
} nready = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,0, -2, ELEMENTS};
int main()
{
int i, j, k;
//Ready buffer for producers
for (i =0; i < ELEMENTS; i++)
{
add.MyShBuff[i]=-1;
}
for(j = 0; j < THREADS; j++) {
pthread_create(&tid_producer[j], NULL, &produce, NULL);
pthread_create(&tid_consumer[j], NULL, &consume, NULL);
}
/* wait for all producers and the consumer*/
for(k = 0; k < THREADS; k++) {
pthread_join(tid_producer[k], NULL);
pthread_join(tid_consumer[k], NULL);
}
/* Clean up and exit */
pthread_mutex_destroy(&nready.mutex);
pthread_mutex_destroy(&add.mutex);
pthread_cond_destroy(&nready.cond);
pthread_exit(NULL);
exit(0);
return 0;
}
void *produce(void *arg)
{
int i = 0;
for ( ; ; )
{
pthread_mutex_lock(&add.mutex);
if(add.index >= ELEMENTS)
{
pthread_mutex_unlock(&add.mutex);
return NULL;
}
if(add.MyShBuff[add.index] == -1)
{
add.value = rand() % 10 + 0;
add.MyShBuff[add.index] = add.value;
printf("Producer thread %d and value: %d\n" ,pthread_self(), add.MyShBuff[add.index]);
add.index++;
}
pthread_mutex_unlock(&add.mutex);
pthread_mutex_lock(&nready.mutex);
if(nready.nready == 0)
{
pthread_cond_broadcast(&nready.cond);
printf("Signal\n");
}
nready.nready++;
pthread_mutex_unlock(&nready.mutex);
}
}
void *consume(void *arg)
{
pthread_mutex_lock(&nready.mutex);
while(nready.empty != 0)
{
while (nready.nready == 0)
{
pthread_cond_wait(&nready.cond,&nready.mutex);
pthread_mutex_lock(&add.mutex);
printf(" Item: %d, Consuming value %d, my thread id is: %d\n", nready.counter, add.MyShBuff[nready.counter], pthread_self());
add.MyShBuff[nready.counter] = -2;
pthread_mutex_unlock(&add.mutex);
nready.counter++;
nready.empty--;
}
nready.nready--;
pthread_mutex_unlock(&nready.mutex);
}
return NULL;
}
As pointed in comments, you use of mutex is wrong in consume (imagine the while conditon is wrong, the thread will leave without releasing the mutex) Moreover, I would suggest to limit the number of variable holding the same information (empty, counter) which makes the flow hard to follow. Finally, you also have a bug in your pthread_wait_cond: see Does pthread_cond_wait(&cond_t, &mutex); unlock and then lock the mutex? or any other question that explain the use of pthread_wait_cond: they stress that the predicate MAY be true upon returning, which mean you must test it again before starting any more computation that requires the predicate to be true.
I rewrote a bit your code to have it work; I hope I did not miss something else, but that may help you to go further:
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdint.h>
#define THREADS 5
#define ELEMENTS 120
pthread_t tid_producer[THREADS], tid_consumer[THREADS];
int value = 0;
int saveValue = 0;
void *produce (void *arg);
void *consume (void *arg);
int producerCount = 0;
int consumerCount = ELEMENTS;
struct
{
pthread_mutex_t mutex;
int index;
int value;
int MyShBuff[ELEMENTS];
} add = { PTHREAD_MUTEX_INITIALIZER, 0, 0 };
struct
{
pthread_mutex_t mutex;
pthread_cond_t cond;
int nready;
int value;
int empty;
int counter;
/* number ready for consumer */
} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, -2, ELEMENTS, 0 };
int main(int argc, char **argv)
{
//Ready buffer for producers
for (int i = 0; i < ELEMENTS; i++) {
add.MyShBuff[i] = -1;
}
for (int j = 0; j < THREADS; j++) {
pthread_create (&tid_producer[j], NULL, &produce, NULL);
pthread_create (&tid_consumer[j], NULL, &consume, NULL);
}
/* wait for all producers and the consumer */
for (int k = 0; k < THREADS; k++) {
pthread_join (tid_producer[k], NULL);
pthread_join (tid_consumer[k], NULL);
}
/* Clean up and exit */
pthread_mutex_destroy (&nready.mutex);
pthread_mutex_destroy (&add.mutex);
pthread_cond_destroy (&nready.cond);
return 0;
}
void *produce(void *dummy)
{
pthread_mutex_lock (&add.mutex);
while (add.index < ELEMENTS) {
if (add.MyShBuff[add.index] == -1)
{
add.value = rand () % 10 + 0;
add.MyShBuff[add.index] = add.value;
printf("Producer thread %ld and value: %d\n" ,pthread_self(), add.MyShBuff[add.index]);
add.index++;
}
pthread_mutex_unlock (&add.mutex);
pthread_mutex_lock (&nready.mutex);
{
if (nready.nready == 0)
{
pthread_cond_broadcast (&nready.cond);
printf ("Signal\n");
}
nready.nready++;
}
pthread_mutex_unlock (&nready.mutex);
pthread_mutex_lock (&add.mutex);
}
pthread_mutex_unlock (&add.mutex);
return NULL;
}
void *consume(void *dummy)
{
pthread_mutex_lock (&nready.mutex);
while (nready.empty != 0)
{
// you also need to check it is not time to leave...
while (nready.nready == 0 && nready.empty != 0) {
pthread_cond_wait (&nready.cond, &nready.mutex);
}
if (nready.empty == 0) {
break;
}
pthread_mutex_lock (&add.mutex);
printf(" Item: %d, Consuming value %d, my thread id is: %ld\n", nready.counter, add.MyShBuff[nready.counter], pthread_self());
add.MyShBuff[nready.counter] = -2;
pthread_mutex_unlock (&add.mutex);
nready.counter++;
nready.empty--;
nready.nready--;
}
pthread_mutex_unlock (&nready.mutex);
return NULL;
}