Search code examples
cmultithreadingpthreadsproducer-consumer

Slow pthread consumer


I've implemented a solution the producer / consumer problem in C using pthreads and semaphores.

My main thread is the producer and I launch N consumer threads.

My code is:

typedef struct
{
    int buf[BUFSIZE];     /* shared var */
    int in;               /* buf[in%BUFSIZE] is the first empty slot */
    int out;              /* buf[out%BUFSIZE] is the first full slot */
    sem_t full;           /* keep track of the number of full spots */
    sem_t empty;          /* keep track of the number of empty spots */
    pthread_mutex_t mutex;          /* enforce mutual exclusion to shared data */
} CONSUMER_STRUCT;

CONSUMER_STRUCT shared;

This is the code for each of my consumer threads:

void *Consumer(void *arg)
{
    int fd, workerID, i, hit=0;

    workerID = *(int *)arg;

    for (;;) {
        sem_wait(&shared.full);
        pthread_mutex_lock(&shared.mutex);
        fd = shared.buf[shared.out];
        printf("\n[C%d] Consumed. I got  %d ...Valor do buffer: %d na posição %d\n\n\n", workerID, fd, shared.buf[shared.out], shared.out);
        ftp(fd, hit);
        shared.buf[shared.out] = 0;
        shared.out = (shared.out+1)%BUFSIZE;
        fflush(stdout);
        printf("\n\n\n\nEstado do buffer:\n\n\n\n");
        for (i = 0; i < BUFSIZE; i++) {
            //printf("%d ", shared.buf[i]);
        }
        /* Release the buffer */
        pthread_mutex_unlock(&shared.mutex);
        /* Increment the number of full slots */
        sem_post(&shared.empty);
        hit++;
    }
    return NULL;
}

And this is the code for my producer thread:

item = socketfd;

sem_wait(&shared.empty);
pthread_mutex_lock(&shared.mutex);

shared.buf[shared.in] = item;

shared.in = (shared.in + 1) % BUFSIZE;
fflush(stdout);

pthread_mutex_unlock(&shared.mutex);
sem_post(&shared.full);

Everything is working properly but serving 22 files takes around 20 seconds whilst creating one thread per request takes around 2 seconds! This seems to be executing a thread at a time and I want to execute all of them "at the same time".

Am I doing something wrong in my implementation approach?


Solution

  • For those who might come here with a similar problem, here's the fix.

    Thanks to @Martin James and @EOF.

    void *Consumer(void *arg)
    {
        int fd, workerID, i, hit=0;
    
        workerID = *(int *)arg;
    
        for (;;) {
            sem_wait(&shared.full);
            pthread_mutex_lock(&shared.mutex);
            fd = shared.buf[shared.out];
            shared.buf[shared.out] = 0;
            shared.out = (shared.out+1)%BUFSIZE;
            pthread_mutex_unlock(&shared.mutex);
            printf("\n[C%d] Consumed. I got  %d ...Valor do buffer: %d na posição %d\n\n\n", workerID, fd, shared.buf[shared.out], shared.out);
            ftp(fd, hit);
            fflush(stdout);
            printf("\n\n\n\nEstado do buffer:\n\n\n\n");
            for (i = 0; i < BUFSIZE; i++) {
                //printf("%d ", shared.buf[i]);
            }
            /* Release the buffer */
            /* Increment the number of full slots */
            sem_post(&shared.empty);
            hit++;
        }
        return NULL;
    }
    

    The problem is that I was locking the mutex, executing a function and then unlocking the mutex. This is what was causing so much delay on the execution.