Search code examples
cmultithreadingqtproducer-consumer

Multiple producers and consumers but one shared reource - Only one thread is running


Here I have created 2 producer threads and 2 consumer threads. They put and take out values only in and from one shared queue.

Problem is that first producer does fill in and then gets into waiting mode.

After that no other thread runs. Explain please what point am I missing.

#include "mainwindow.h"
#include <QApplication>

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>


pthread_mutex_t mutexVariable     = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  conditionVariable = PTHREAD_COND_INITIALIZER;

int numberOfActiveProducers;
int numberOfActiveConsumers;

QList <int> sharedQueue;
/*
 * `sharedQueue`'s size is assumed to be 10 ATM.
 * `sharedQueue` is supposed to be shared among two threads.
 * Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
 * Assumption: `sharedQueue` can contain only 10 elements at a time.
 */

int sizeOfSharedQueue;

//  This function is run by the thread `Producer A`.
void *threadProducerAFunction (void *arg) {
    Q_UNUSED (arg);

    while (1) {
        qDebug () << "\nProducer A";

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () < 10) {
            sharedQueue.push_back (1);
            qDebug () << "\nPushed by Producer A: Length of queue is: " << sharedQueue.length ();
        }
        else {
            qDebug () << "\nProducer A has done its bit and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
            pthread_cond_wait (&conditionVariable, &mutexVariable);
        }

        pthread_mutex_unlock (&mutexVariable);
    }

    return NULL;
}

//  This function is run by the thread `ProducerB`.
void *threadProducerBFunction (void *arg) {
    Q_UNUSED (arg);

    while (1) {
        qDebug () << "\nProducer B";

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () < 10) {
            sharedQueue.push_back (1);
            qDebug () << "\nPushed by Producer B: Length of queue is: " << sharedQueue.length ();
        }
        else {
            qDebug () << "\nProducer B has done its bit and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
            pthread_cond_wait (&conditionVariable, &mutexVariable);
        }

        pthread_mutex_unlock (&mutexVariable);
    }

    return NULL;
}

//  This function is run by the thread `Consumer A`.
void *threadConsumerAFunction (void *arg) {
    Q_UNUSED (arg);

    while (1) {
        qDebug () << "\nConsumer A";

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () > 0) {
            sharedQueue.pop_front ();
            qDebug () << "\nRemoved by thread Consumer A. Length of queue is: " << sharedQueue.length ();
        }
        else {
            pthread_cond_signal (&conditionVariable);
            qDebug () << "\nSignal issued by thread Consumer A. Length of queue is: " << sharedQueue.length ();
        }

        pthread_mutex_unlock (&mutexVariable);
    }   
    return NULL;
}

//  This function is run by the thread `Consumer B`.
void *threadConsumerBFunction (void *arg) {
    Q_UNUSED (arg);

    while (1) {
        qDebug () << "\nConsumer B";

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () > 0) {
            sharedQueue.pop_front ();
            qDebug () << "\nRemoved by thread Consumer B. Length of queue is: " << sharedQueue.length ();
        }
        else {
            pthread_cond_signal (&conditionVariable);
            qDebug () << "\nSignal issued by thread Consumer B. Length of queue is: " << sharedQueue.length ();
        }

        pthread_mutex_unlock (&mutexVariable);
    }
    return NULL;
}

int main (int argc, char *argv[]) {
    numberOfActiveProducers = 2;
    numberOfActiveConsumers = 2;
    sizeOfSharedQueue       = 10;

    // `sharedQueue` initialization by 0.
    for (int i = 0; i < sizeOfSharedQueue; i++) {
        sharedQueue.push_back (0);
    }

    // Producer threads creation and joining
    pthread_t producerA;
    pthread_t producerB;

    if (pthread_create (&producerA, NULL, threadProducerAFunction, NULL)) {
        fprintf (stderr, "Error creating thread Producer A\n");
        return 1;
    }

    if (pthread_join (producerA, NULL)) {
        fprintf (stderr, "Error joining thread Producer A\n");
        return 2;
    }

    if (pthread_create (&producerB, NULL, threadProducerBFunction, NULL)) {
        fprintf (stderr, "Error creating thread Producer A\n");
        return 1;
    }

    if (pthread_join (producerB, NULL)) {
        fprintf (stderr, "Error joining thread Producer B\n");
        return 2;
    }

    // Consumer threads creation and joining
    pthread_t consumerA;
    pthread_t consumerB;

    if (pthread_create (&consumerA, NULL, threadConsumerAFunction, NULL)) {
        fprintf (stderr, "Error creating thread Consumer A\n");
        return 1;
    }

    if (pthread_join (consumerA, NULL)) {
        fprintf (stderr, "Error joining thread Consumer A\n");
        return 2;
    }

    if (pthread_create (&consumerB, NULL, threadConsumerBFunction, NULL)) {
        fprintf (stderr, "Error creating thread Consumer B\n");
        return 1;
    }

    if (pthread_join (consumerB, NULL)) {
        fprintf (stderr, "Error joining thread Consumer B\n");
        return 2;
    }

    QApplication a (argc, argv);
    MainWindow w;
    w.show ();

    return a.exec ();
}

Solution

  • The problem is that there are pthread_join calls after each pthread_create call in main. pthread_join by definition will block until the thread it is waiting for exits. Since none of the child threads exit the result is that the first pthread_join call will block indefinetely and hence none of the subsequent pthread_create calls are executed.

    One fix is to just remove all the pthread_join calls. pthread_join is typically used to wait for and get the return status of child threads or to synchronise the main thread so that it does not exit before the child thread has completed. So those pthread_join calls are are not actually needed in this case because the child threads do not exit and the main thread calls a.exec() which performs the task of preventing it from exiting.

    Unrelated to the actual question but I see that you have essentially duplicated the producer and consumer code for each thread. That is unnecessary as the same thread function can be passed to multiple pthread_create calls (as long as there are no static variables). If you want to differentiate the instances for debugging purposes either use the thread id or pass in a different arg to each thread for identification.