Here I have created 2 producer threads and 2 consumer threads. They put and take out values only in and from one shared queue.
Problem is that first producer does fill in and then gets into waiting mode.
After that no other thread runs. Explain please what point am I missing.
#include "mainwindow.h"
#include <QApplication>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>
pthread_mutex_t mutexVariable = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t conditionVariable = PTHREAD_COND_INITIALIZER;
int numberOfActiveProducers;
int numberOfActiveConsumers;
QList <int> sharedQueue;
/*
* `sharedQueue`'s size is assumed to be 10 ATM.
* `sharedQueue` is supposed to be shared among two threads.
* Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
* Assumption: `sharedQueue` can contain only 10 elements at a time.
*/
int sizeOfSharedQueue;
// This function is run by the thread `Producer A`.
void *threadProducerAFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nProducer A";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () < 10) {
sharedQueue.push_back (1);
qDebug () << "\nPushed by Producer A: Length of queue is: " << sharedQueue.length ();
}
else {
qDebug () << "\nProducer A has done its bit and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
pthread_cond_wait (&conditionVariable, &mutexVariable);
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the thread `ProducerB`.
void *threadProducerBFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nProducer B";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () < 10) {
sharedQueue.push_back (1);
qDebug () << "\nPushed by Producer B: Length of queue is: " << sharedQueue.length ();
}
else {
qDebug () << "\nProducer B has done its bit and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
pthread_cond_wait (&conditionVariable, &mutexVariable);
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the thread `Consumer A`.
void *threadConsumerAFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nConsumer A";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () > 0) {
sharedQueue.pop_front ();
qDebug () << "\nRemoved by thread Consumer A. Length of queue is: " << sharedQueue.length ();
}
else {
pthread_cond_signal (&conditionVariable);
qDebug () << "\nSignal issued by thread Consumer A. Length of queue is: " << sharedQueue.length ();
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the thread `Consumer B`.
void *threadConsumerBFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nConsumer B";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () > 0) {
sharedQueue.pop_front ();
qDebug () << "\nRemoved by thread Consumer B. Length of queue is: " << sharedQueue.length ();
}
else {
pthread_cond_signal (&conditionVariable);
qDebug () << "\nSignal issued by thread Consumer B. Length of queue is: " << sharedQueue.length ();
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
int main (int argc, char *argv[]) {
numberOfActiveProducers = 2;
numberOfActiveConsumers = 2;
sizeOfSharedQueue = 10;
// `sharedQueue` initialization by 0.
for (int i = 0; i < sizeOfSharedQueue; i++) {
sharedQueue.push_back (0);
}
// Producer threads creation and joining
pthread_t producerA;
pthread_t producerB;
if (pthread_create (&producerA, NULL, threadProducerAFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer A\n");
return 1;
}
if (pthread_join (producerA, NULL)) {
fprintf (stderr, "Error joining thread Producer A\n");
return 2;
}
if (pthread_create (&producerB, NULL, threadProducerBFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer A\n");
return 1;
}
if (pthread_join (producerB, NULL)) {
fprintf (stderr, "Error joining thread Producer B\n");
return 2;
}
// Consumer threads creation and joining
pthread_t consumerA;
pthread_t consumerB;
if (pthread_create (&consumerA, NULL, threadConsumerAFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer A\n");
return 1;
}
if (pthread_join (consumerA, NULL)) {
fprintf (stderr, "Error joining thread Consumer A\n");
return 2;
}
if (pthread_create (&consumerB, NULL, threadConsumerBFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer B\n");
return 1;
}
if (pthread_join (consumerB, NULL)) {
fprintf (stderr, "Error joining thread Consumer B\n");
return 2;
}
QApplication a (argc, argv);
MainWindow w;
w.show ();
return a.exec ();
}
The problem is that there are pthread_join
calls after each pthread_create
call in main
. pthread_join
by definition will block until the thread it is waiting for exits. Since none of the child threads exit the result is that the first pthread_join
call will block indefinetely and hence none of the subsequent pthread_create
calls are executed.
One fix is to just remove all the pthread_join
calls. pthread_join
is typically used to wait for and get the return status of child threads or to synchronise the main thread so that it does not exit before the child thread has completed. So those pthread_join
calls are are not actually needed in this case because the child threads do not exit and the main thread calls a.exec()
which performs the task of preventing it from exiting.
Unrelated to the actual question but I see that you have essentially duplicated the producer and consumer code for each thread. That is unnecessary as the same thread function can be passed to multiple pthread_create
calls (as long as there are no static variables). If you want to differentiate the instances for debugging purposes either use the thread id or pass in a different arg
to each thread for identification.