Shared queue is one. Producers and Consumers are two, each.
Below is the output, and I have pasted the program after that:
My problem is that the print statement qDebug () << "\nConsumer: " << tId;
, is at the top still it didn't print first. I want to understand why.
Producer 140588830992128 couldn't push any data since queue was already full. Length of queue is: 10
Removed by thread Consumer: 140588814206720 , Length of queue is: 9
Removed by thread Consumer: 140588814206720 , Length of queue is: 8
Removed by thread Consumer: 140588814206720 , Length of queue is: 7
Removed by thread Consumer: 140588814206720 , Length of queue is: 6
Removed by thread Consumer: 140588814206720 , Length of queue is: 5
Consumer: 140588814206720
Removed by thread Consumer: 140588814206720 , Length of queue is: 4
Removed by thread Consumer: 140588814206720 , Length of queue is: 3
Removed by thread Consumer: 140588814206720 , Length of queue is: 2
Consumer: 140588814206720
Removed by thread Consumer: 140588814206720 , Length of queue is: 1
Consumer: 140588814206720
Removed by thread Consumer: 140588814206720 , Length of queue is: 0
Here is the program that I wrote:
#include "mainwindow.h"
#include <QApplication>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>
pthread_mutex_t mutexVariable = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t conditionVariable = PTHREAD_COND_INITIALIZER;
int numberOfActiveProducers;
int numberOfActiveConsumers;
QList <int> sharedQueue;
/*
* `sharedQueue`'s size is assumed to be 10 ATM.
* `sharedQueue` is supposed to be shared among two threads.
* Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
* Assumption: `sharedQueue` can contain only 10 elements at a time.
*/
int sizeOfSharedQueue;
// This function is run by the `Producer` threads.
void *producerThreadFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
pthread_t tId = pthread_self();
qDebug () << "\nProducer: " << tId;
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () < 10) {
sharedQueue.push_back (1);
qDebug () << "\nPushed by Producer " << tId << ": " << "Length of queue is: " << sharedQueue.length ();
}
else {
qDebug () << "\nProducer " << tId << " has no work to do since quque is full, and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
pthread_cond_wait (&conditionVariable, &mutexVariable);
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the `Consumer` threads.
void *consumerThreadFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
pthread_t tId = pthread_self ();
qDebug () << "\nConsumer: " << tId;
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () > 0) {
for (int u = 0; u < sharedQueue.length (); u++) {
sharedQueue.pop_front ();
qDebug () << "\nRemoved by thread Consumer: " << tId << ", Length of queue is: " << sharedQueue.length ();
}
}
else {
pthread_cond_signal (&conditionVariable);
qDebug () << "\nSignal issued by thread Consumer: " << tId << ", Length of queue is: " << sharedQueue.length ();
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
int main (int argc, char *argv[]) {
numberOfActiveProducers = 2;
numberOfActiveConsumers = 2;
sizeOfSharedQueue = 10;
// Producer threads creation
pthread_t producerA;
pthread_t producerB;
if (pthread_create (&producerA, NULL, producerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer A\n");
return 1;
}
if (pthread_create (&producerB, NULL, producerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer B\n");
return 1;
}
// Consumer threads creation
pthread_t consumerA;
pthread_t consumerB;
if (pthread_create (&consumerA, NULL, consumerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer A\n");
return 1;
}
if (pthread_create (&consumerB, NULL, consumerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer B\n");
return 1;
}
// Joining every thread
if (pthread_join (producerA, NULL)) {
fprintf (stderr, "Error joining thread Producer A\n");
return 2;
}
if (pthread_join (producerB, NULL)) {
fprintf (stderr, "Error joining thread Producer B\n");
return 2;
}
if (pthread_join (consumerB, NULL)) {
fprintf (stderr, "Error joining thread Consumer B\n");
return 2;
}
if (pthread_join (consumerA, NULL)) {
fprintf (stderr, "Error joining thread Consumer A\n");
return 2;
}
QApplication a (argc, argv);
MainWindow w;
w.show ();
return a.exec ();
}
Added the screenshot with std::cerr
. No difference in output:
On my system I do see the output you expect, starting with:
Producer: 0x700000081000
Consumer: 0x700000187000
Producer: 0x700000104000
Consumer: 0x70000020a000
But this is just luck, because it seems that qDebug() is not guaranteed to be thread-safe. One solution might be to only do logging while you hold the mutex. Or use a different logging facility. Note that writing to std::cerr
is thread-safe, but you may end up with interleaving if you write multiple tokens using <<
. So you can format your log lines first, and write them "atomically" via std::cerr.write()
.