Search code examples
cpthreadsmutexthread-synchronization

Is this the correct way to use pthread?


I am trying to create an HTTP server using multi-threading. main() hands off the client_sock from accept() to one of the worker threads. If no worker threads are available, it waits until one is. I am restricted to not being able to call accept() within the worker threads. Here is a portion of my code so far. Some questions I have are:

  1. Do I need to use 2 pthread mutex and condition variables as I do right now?
  2. Do I need to use pthread lock or unlock at all in these cases?
  3. If I wanted to add a mutex lock when files are being created on the server, would I have to create another mutex variable or would one of the existing ones work?
#include <iostream>
#include <err.h>
#include <fcntl.h>
#include <netdb.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <getopt.h>
#include <pthread.h>

#define SIZE 1024

struct shared_data
{
    int redundancy;
    int client_sock;
    int working_threads;
    int dispatch_ready;
    pthread_mutex_t* dispatch_mutex;
    pthread_mutex_t* worker_mutex;
    pthread_cond_t* dispatch_cond;
    pthread_cond_t* worker_cond;
};

void* receiveAndSend(void* obj)
{
    struct shared_data* data = (struct shared_data*) obj;

    int bytes;
    char buff[SIZE + 1];

    while(1)
    {
        while(!data->dispatch_ready)
        {
            pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
        }
        data->dispatch_ready = 0;

        data->working_threads++;

        client_sock = data->client_sock;

        bytes = recv(client_sock, buff, SIZE, 0);

        // do work

        data->working_threads--;
        pthread_cond_signal(data->worker_cond);
    }
}

int main(int argc, char* argv[])
{
    if(argc < 2 || argc > 6)
    {
        char msg[] = "Error: invalid arg amount\n";
        write(STDERR_FILENO, msg, strlen(msg));
        exit(1);
    }

    char* addr = NULL;
    unsigned short port = 80;
    int num_threads = 4;
    int redundancy = 0;
    char opt;

    while((opt = getopt(argc, argv, "N:r")) != -1)
    {
        if(opt == 'N')
        {
            num_threads = atoi(optarg);

            if(num_threads < 1)
            {
                char msg[] = "Error: invalid input for -N argument\n";
                write(STDERR_FILENO, msg, strlen(msg));
                exit(1);
            }
        }
        else if(opt == 'r')
        {
            redundancy = 1;
        }
        else
        {
            // error (getopt automatically sends an error message)
            return 1;
        }
    }

    // non-option arguments are always the last indexes of argv, no matter how they are written in the terminal
    // optind is the next index of argv after all options
    if(optind < argc)
    {
        addr = argv[optind];
        optind++;
    }

    if(optind < argc)
    {
        port = atoi(argv[optind]);
    }

    if(addr == NULL)
    {
        char msg[] = "Error: no address specified\n";
        write(STDERR_FILENO, msg, strlen(msg));
        exit(1);
    }

    struct sockaddr_in serv_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = getaddr(addr);
    serv_addr.sin_port = htons(port);

    int serv_sock = socket(AF_INET, SOCK_STREAM, 0);
    if(serv_sock < 0)
    {
        err(1, "socket()");
    }

    if(bind(serv_sock, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) < 0)
    {
        err(1, "bind()");
    }

    if(listen(serv_sock, 500) < 0)
    {
        err(1, "listen()");
    }

    // Connecting with a client
    struct sockaddr client_addr;
    socklen_t client_addrlen;

    pthread_mutex_t dispatch_mutex;
    pthread_mutex_init(&dispatch_mutex, NULL);
    pthread_mutex_t worker_mutex;
    pthread_mutex_init(&worker_mutex, NULL);
    pthread_cond_t dispatch_cond;
    pthread_cond_init(&dispatch_cond, NULL);
    pthread_cond_t worker_cond;
    pthread_cond_init(&worker_cond, NULL);

    struct shared_data data;

    data.redundancy = redundancy;
    data.dispatch_ready = 0;
    data.working_threads = 0;
    data.dispatch_mutex = &dispatch_mutex;
    data.worker_mutex = &worker_mutex;
    data.dispatch_cond = &dispatch_cond;
    data.worker_cond = &worker_cond;

    pthread_t* threads = new pthread_t[num_threads];
    for (int i = 0; i < num_threads; i++)
    {
        pthread_create(&threads[i], NULL, receiveAndSend, &data);
    }

    while(1)
    {
        data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);

        while(data.working_threads == num_threads)
        {
            pthread_cond_wait(data.worker_cond, data.worker_mutex);
        }

        data.dispatch_ready = 1;
        pthread_cond_signal(data.dispatch_cond);
    }

    return 0;
}


Solution

  • There are many very basic bugs in your program, which pretty clearly demonstrate that you don't understand locks and condition variables (or appropriate use of pointers).

    A lock protects some shared data. You have exactly one shared data item, therefore you should need exactly one lock (mutex) to protect it.

    A condition variable indicates that some condition is true. Reasonable conditions for your use case would be worker_available and work_available. (Naming your condition variables dispatch_cond and worker_cond does not help clarity.)

    A condition variable is always associated with a mutex, but you don't need two separate mutexes just because you have two condition variables.


    On to the bugs.

    This code is obviously buggy:

        while(1)
        {
            while(!data->dispatch_ready)
            {
                pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
            }
    

    From man pthread_cond_wait:

    atomically release mutex and cause the calling thread to block on the condition variable cond

    How can this thread release a mutex if it never acquired it?
    Also, how can this thread read data->dispatch_ready (shared with other threads) without acquiring a mutex?

    This code:

        struct shared_data data;
    
        data.redundancy = redundancy;
        data.dispatch_ready = 0;
        data.working_threads = 0;
        data.dispatch_mutex = &dispatch_mutex;
        data.worker_mutex = &worker_mutex;
        data.dispatch_cond = &dispatch_cond;
        data.worker_cond = &worker_cond;
    

    isn't buggy, but has unnecessary indirection. You could make dispatch_mutex and condition variables be part of shared_data, like so:

    struct shared_data
    {
        int redundancy;
        int client_sock;
        int working_threads;
        int dispatch_ready;
        pthread_mutex_t dispatch_mutex;
        pthread_mutex_t worker_mutex;
        pthread_cond_t dispatch_cond;
        pthread_cond_t worker_cond;
    };
    

    And here is the most subtle bug I noticed:

            data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);
    ...
            data.dispatch_ready = 1;
            pthread_cond_signal(data.dispatch_cond);
    

    Here, you will wake up at least one of the threads waiting for dispatch_cond, but may wake up more than one. If more than one thread is awoken, they all will proceed to recv on the same client_sock with potentially disastrous results.

    Update:

    How do I fix this.

    Probably the best and most performant way to fix this is to have a queue of "work items" (using e.g. a double-linked list with head and tail pointers), protected by a lock.

    Main thread would add elements at the tail (while holding a lock), and signal "not empty" condition variable.

    Worker threads would remove head element (while holding a lock).
    Worker threads would block on "not empty" condition variable when queue is empty.

    Main thread may continue adding elements when queue is full (all workers are busy), or it may block waiting for a worker to become available, or it can return "429 too many requests" to the client.