Search code examples
c++multithreadingpthreadsproducer-consumer

Irregular print statements in multiple producer consumer program


Shared queue is one. Producers and Consumers are two, each.

Below is the output, and I have pasted the program after that:

My problem is that the print statement qDebug () << "\nConsumer: " << tId;, is at the top still it didn't print first. I want to understand why.


Producer  140588830992128  couldn't push any data since queue was already full. Length of queue is:  10
Removed by thread Consumer:  140588814206720 , Length of queue is:  9
Removed by thread Consumer:  140588814206720 , Length of queue is:  8
Removed by thread Consumer:  140588814206720 , Length of queue is:  7
Removed by thread Consumer:  140588814206720 , Length of queue is:  6
Removed by thread Consumer:  140588814206720 , Length of queue is:  5
Consumer:  140588814206720
Removed by thread Consumer:  140588814206720 , Length of queue is:  4
Removed by thread Consumer:  140588814206720 , Length of queue is:  3
Removed by thread Consumer:  140588814206720 , Length of queue is:  2
Consumer:  140588814206720
Removed by thread Consumer:  140588814206720 , Length of queue is:  1
Consumer:  140588814206720
Removed by thread Consumer:  140588814206720 , Length of queue is:  0

Here is the program that I wrote:

#include "mainwindow.h"
#include <QApplication>

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>

pthread_mutex_t mutexVariable     = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  conditionVariable = PTHREAD_COND_INITIALIZER;

int numberOfActiveProducers;
int numberOfActiveConsumers;

QList <int> sharedQueue;
/*
 * `sharedQueue`'s size is assumed to be 10 ATM.
 * `sharedQueue` is supposed to be shared among two threads.
 * Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
 * Assumption: `sharedQueue` can contain only 10 elements at a time.
 */

int sizeOfSharedQueue;

//  This function is run by the `Producer` threads.
void *producerThreadFunction (void *arg) {
    Q_UNUSED (arg);

    while (1) {
        pthread_t tId = pthread_self();
        qDebug () << "\nProducer: " << tId;

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () < 10) {
            sharedQueue.push_back (1);
            qDebug () << "\nPushed by Producer " << tId << ": " << "Length of queue is: " << sharedQueue.length ();
        }
        else {
            qDebug () << "\nProducer " << tId << " has no work to do since quque is full, and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
            pthread_cond_wait (&conditionVariable, &mutexVariable);
        }

        pthread_mutex_unlock (&mutexVariable);
    }

    return NULL;
}

//  This function is run by the `Consumer` threads.
void *consumerThreadFunction (void *arg) {
    Q_UNUSED (arg);

    while (1) {
        pthread_t tId = pthread_self ();
        qDebug () << "\nConsumer: " << tId;

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () > 0) {
            for (int u = 0; u < sharedQueue.length (); u++) {
                sharedQueue.pop_front ();
                qDebug () << "\nRemoved by thread Consumer: " << tId << ", Length of queue is: " << sharedQueue.length ();
            }
        }
        else {
            pthread_cond_signal (&conditionVariable);
            qDebug () << "\nSignal issued by thread Consumer: " << tId << ", Length of queue is: " << sharedQueue.length ();
        }

        pthread_mutex_unlock (&mutexVariable);
    }   
    return NULL;
}

int main (int argc, char *argv[]) {
    numberOfActiveProducers = 2;
    numberOfActiveConsumers = 2;
    sizeOfSharedQueue       = 10;

    // Producer threads creation
    pthread_t producerA;
    pthread_t producerB;

    if (pthread_create (&producerA, NULL, producerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Producer A\n");
        return 1;
    }

    if (pthread_create (&producerB, NULL, producerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Producer B\n");
        return 1;
    }

    // Consumer threads creation
    pthread_t consumerA;
    pthread_t consumerB;

    if (pthread_create (&consumerA, NULL, consumerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Consumer A\n");
        return 1;
    }

    if (pthread_create (&consumerB, NULL, consumerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Consumer B\n");
        return 1;
    }

    // Joining every thread
    if (pthread_join (producerA, NULL)) {
        fprintf (stderr, "Error joining thread Producer A\n");
        return 2;
    }

    if (pthread_join (producerB, NULL)) {
        fprintf (stderr, "Error joining thread Producer B\n");
        return 2;
    }

    if (pthread_join (consumerB, NULL)) {
        fprintf (stderr, "Error joining thread Consumer B\n");
        return 2;
    }

    if (pthread_join (consumerA, NULL)) {
        fprintf (stderr, "Error joining thread Consumer A\n");
        return 2;
    }

    QApplication a (argc, argv);
    MainWindow w;
    w.show ();

    return a.exec ();
}

Added the screenshot with std::cerr. No difference in output:

enter image description here


Solution

  • On my system I do see the output you expect, starting with:

    Producer:  0x700000081000 
    Consumer:  0x700000187000 
    Producer:  0x700000104000 
    Consumer:  0x70000020a000 
    

    But this is just luck, because it seems that qDebug() is not guaranteed to be thread-safe. One solution might be to only do logging while you hold the mutex. Or use a different logging facility. Note that writing to std::cerr is thread-safe, but you may end up with interleaving if you write multiple tokens using <<. So you can format your log lines first, and write them "atomically" via std::cerr.write().