Search code examples
cpthreadsmemset

Problems with simple threadpool and TCP listener in C


I've implement a multi threaded TCP listener in c with a simple thread pool logic and I have two main problem after I tested with a for((i=1;i<100000;i++)) do echo "Hi $i" |nc 127.0.0.1 2000; in multiple terminals:

  1. I would get 'pthread: Cannot allocate memory' after over 30000 packets even if I use pthread_exit() and pthread_join (with 5gb free memory and ps -hH shows only 4 to 5 threads) it will work only if I use pthread_detach(pthread_self()).
  2. When I use pthread_detach(pthread_self()) I would get some extra character at my output, after I use memset(&buffer,0,sizeof buffer) on local buffer variable in handle_client function everything works fine but I guess there is a problem in my code which cause this.
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#define handle_err(msg)     \
    do                      \
    {                       \
        perror(msg);        \
        exit(EXIT_FAILURE); \
    } while (0)
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
#define MAX_ACCEPT_THREAD 200
uint empty_thread_sp = 1;
int empty_thread[MAX_ACCEPT_THREAD];
pthread_t *process_thread;
struct ip_args
{
    int port;
    int ipaddr;
};
struct client_thread
{
    unsigned int cfd;
    int thread_number;
};
int create_thread_pool()
{
    int i = 0;
    for (i = 0; i < MAX_ACCEPT_THREAD; i++)
    {
        empty_thread[i] = i;
    }
    process_thread =malloc(MAX_ACCEPT_THREAD * sizeof *process_thread);
}
int get_thread()
{

    if (empty_thread_sp < MAX_ACCEPT_THREAD)
    {
        pthread_mutex_lock(&lock);
        empty_thread_sp++;
        printf("ThreadNO_in_Get=%d\n", empty_thread[empty_thread_sp]);
        pthread_mutex_unlock(&lock);
        return empty_thread[empty_thread_sp];
    }
    else
    {
        return get_thread();
    }
}
int release_thread(struct client_thread *ct)
{
    if (empty_thread_sp > 0)
    {
        pthread_mutex_lock(&lock);
        empty_thread[--empty_thread_sp] = ct->thread_number;
        printf("ThreadNO_in_R=%d\n", empty_thread[empty_thread_sp]);
        pthread_mutex_unlock(&lock);
    }
    else
    {
        return 0;
    }
}
void *handle_client(void *arg)
{
    pthread_detach(pthread_self());
    int no;
    char buffer[1024];
    //TODO is this ok?
    memset(&buffer,0,sizeof buffer);
    struct client_thread *ct = arg;
    int n;
    n = read(ct->cfd, buffer, 1024);
    printf("%s", buffer);
    write(ct->cfd, &buffer, strlen(buffer));
    close(ct->cfd);
    release_thread(ct);
    no = ct->thread_number;
    free(ct);
    //TODO the following lines do not work
    //pthread_exit(NULL);
    //pthread_join(process_thread[no],NULL);
    
}
void *start_listen(void *args)
{
    struct ip_args *listen_addr = args;
    unsigned int sfd;
    struct client_thread *ct; 
    struct sockaddr_in my_addr;
    sfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sfd == -1)
        handle_err("socket");
    printf("%d\n", sfd);
    memset(&my_addr, 0, sizeof(my_addr));
    my_addr.sin_family = AF_INET;
    my_addr.sin_addr.s_addr = htonl(listen_addr->ipaddr);
    my_addr.sin_port = htons(listen_addr->port);
    struct sockaddr_in peer_addr;
    if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
        handle_err("bind");
    if (listen(sfd, 10) == -1)
        handle_err("listen");

    while (1)
    {
        ct= malloc(sizeof *ct); 
        socklen_t addr_size = sizeof peer_addr;
        ct->cfd = accept(sfd, (struct sockaddr *)&peer_addr, &addr_size);
        if (ct->cfd == 1)
            handle_err("accept");

        ct->thread_number = 0;
        ct->thread_number = get_thread();

        printf("Thread_number = %d\n", ct->thread_number);

        if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0)
            handle_err("pthread");
    }
}
int main()
{
    create_thread_pool();
    /* 1- socket 2-bind 3-listen 4-accept*/
    struct ip_args listen_addr1, listen_addr2, control_addr;
    listen_addr1.ipaddr = INADDR_ANY;
    listen_addr1.port = 2000;
    listen_addr2.ipaddr = INADDR_ANY;
    listen_addr2.port = 3000;
    control_addr.ipaddr = INADDR_LOOPBACK;
    control_addr.port = 57000;
    pthread_t listen_thread[3];
    pthread_create(&listen_thread[0], NULL, start_listen, (void *)&listen_addr1);
    pthread_create(&listen_thread[1], NULL, start_listen, (void *)&listen_addr2);
    //pthread_create(&listen_thread[2], NULL, start_listen, (void *)&control_addr);
    start_listen((void *)&control_addr);
    return 0;
}

Edited: @terehpp Sample works fine but It only create 3 process thread, Based on a @Martin James and @terehpp suggestion I should change my design. And about my second issue based on @terehpp hint I should use memset(&buffer,0,sizeof buffer) on a local non-static variable.


Solution

  • If we talk only about pthread_join problem and don't touch design issues:

    1. You should invoke pthread_join as close as you can to pthread_create (If you have joinable thread). I remove pthread_detach and invoke pthread_join after pthread_create in the start_listen function. Also you should free resources in the function where you allocated them. So I replace free(ct); from handle_client to start_listen

    Something like this

         if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0) {                 
             handle_err("pthread");                                                                                      
         } else {                                                                                                        
             pthread_join(process_thread[ct->thread_number], NULL);
             // As long as we have threadpool we should reallocate resources for 
             // process_thread[ct->thread_number]
             // because after pthread_join it will be free automaticly
             //I just catch this error with coredump, so.....
             process_thread[ct->thread_number] = malloc(sizeof(pthread_t));
             // But I am not really sure this good solution, I think malloc at this place far far away from }good solution                                                                                    
         }                   }                                                                                            
         free(ct);
    

    So, now it works without crash for((i=1;i<100000;i++)) do echo "Hi $i" |nc 127.0.0.1 2000; in multiple terminals

    1. I am not able to reproduce your second issue. Code with pthread_detach works well on my machine.

    Fixes after segmentation fault comment

    I change get_thread() and release_thread() methods, also some changes to the start_listener and handler method:

     int empty_thread_sp = MAX_ACCEPT_THREAD - 1;
     ...
     int get_thread()
     {
        while(1) {//My eyes bleeding, too....
            pthread_mutex_lock(&lock);
            if (empty_thread_sp >= 0) {
                int thread;
                printf("ThreadNO_in_Get=%d, index=%d\n",empty_thread[empty_thread_sp], empty_thread_sp);
                thread = empty_thread[empty_thread_sp];
                empty_thread_sp -= 1;
    
                pthread_mutex_unlock(&lock);
                return thread;
             }
    
             pthread_mutex_unlock(&lock);
        }
     }
    
     int release_thread(struct client_thread *ct)
     {
         pthread_mutex_lock(&lock);
         empty_thread_sp += 1;
         empty_thread[empty_thread_sp] = ct->thread_number;
         printf("ThreadNO_in_R=%d, index=%d\n", empty_thread[empty_thread_sp], empty_thread_sp);
         pthread_mutex_unlock(&lock);
         return 0;
     }
    
     void *handle_client(void *arg)
     {
         int no;
         char buffer[1024];
         memset(&buffer,0,sizeof buffer);
         struct client_thread *ct = arg;
         int n;
         n = read(ct->cfd, buffer, 1024);
         printf("%s", buffer);
         write(ct->cfd, &buffer, strlen(buffer));
         close(ct->cfd);
         return NULL;
      }
      
      void *start_listen(void *args)
      {
         struct ip_args *listen_addr = args;
         unsigned int sfd;
         struct client_thread *ct; 
         struct sockaddr_in my_addr;
         sfd = socket(AF_INET, SOCK_STREAM, 0);
         if (sfd == -1)
             handle_err("socket");
         printf("%d\n", sfd);
         memset(&my_addr, 0, sizeof(my_addr));
         my_addr.sin_family = AF_INET;
         my_addr.sin_addr.s_addr = htonl(listen_addr->ipaddr);
         my_addr.sin_port = htons(listen_addr->port);
         struct sockaddr_in peer_addr;
         if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
             handle_err("bind");
         if (listen(sfd, 10) == -1)
             handle_err("listen");
    
         while (1)
         {
             ct= malloc(sizeof *ct); 
             socklen_t addr_size = sizeof peer_addr;
             ct->cfd = accept(sfd, (struct sockaddr *)&peer_addr, &addr_size);
             if (ct->cfd == 1)
                 handle_err("accept");
    
             ct->thread_number = get_thread();
    
             printf("Thread_number = %d\n", ct->thread_number);
    
             if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0){
                 handle_err("pthread");
             } else {
                 pthread_join(process_thread[ct->thread_number], NULL);
                 release_thread(ct);
             }
             free(ct);
         }
      }