I've implement a multi threaded TCP listener in c with a simple thread pool logic and I have two main problem after I tested with a for((i=1;i<100000;i++)) do echo "Hi $i" |nc 127.0.0.1 2000;
in multiple terminals:
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#define handle_err(msg) \
do \
{ \
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
#define MAX_ACCEPT_THREAD 200
uint empty_thread_sp = 1;
int empty_thread[MAX_ACCEPT_THREAD];
pthread_t *process_thread;
struct ip_args
{
int port;
int ipaddr;
};
struct client_thread
{
unsigned int cfd;
int thread_number;
};
int create_thread_pool()
{
int i = 0;
for (i = 0; i < MAX_ACCEPT_THREAD; i++)
{
empty_thread[i] = i;
}
process_thread =malloc(MAX_ACCEPT_THREAD * sizeof *process_thread);
}
int get_thread()
{
if (empty_thread_sp < MAX_ACCEPT_THREAD)
{
pthread_mutex_lock(&lock);
empty_thread_sp++;
printf("ThreadNO_in_Get=%d\n", empty_thread[empty_thread_sp]);
pthread_mutex_unlock(&lock);
return empty_thread[empty_thread_sp];
}
else
{
return get_thread();
}
}
int release_thread(struct client_thread *ct)
{
if (empty_thread_sp > 0)
{
pthread_mutex_lock(&lock);
empty_thread[--empty_thread_sp] = ct->thread_number;
printf("ThreadNO_in_R=%d\n", empty_thread[empty_thread_sp]);
pthread_mutex_unlock(&lock);
}
else
{
return 0;
}
}
void *handle_client(void *arg)
{
pthread_detach(pthread_self());
int no;
char buffer[1024];
//TODO is this ok?
memset(&buffer,0,sizeof buffer);
struct client_thread *ct = arg;
int n;
n = read(ct->cfd, buffer, 1024);
printf("%s", buffer);
write(ct->cfd, &buffer, strlen(buffer));
close(ct->cfd);
release_thread(ct);
no = ct->thread_number;
free(ct);
//TODO the following lines do not work
//pthread_exit(NULL);
//pthread_join(process_thread[no],NULL);
}
void *start_listen(void *args)
{
struct ip_args *listen_addr = args;
unsigned int sfd;
struct client_thread *ct;
struct sockaddr_in my_addr;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
handle_err("socket");
printf("%d\n", sfd);
memset(&my_addr, 0, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_addr.s_addr = htonl(listen_addr->ipaddr);
my_addr.sin_port = htons(listen_addr->port);
struct sockaddr_in peer_addr;
if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
handle_err("bind");
if (listen(sfd, 10) == -1)
handle_err("listen");
while (1)
{
ct= malloc(sizeof *ct);
socklen_t addr_size = sizeof peer_addr;
ct->cfd = accept(sfd, (struct sockaddr *)&peer_addr, &addr_size);
if (ct->cfd == 1)
handle_err("accept");
ct->thread_number = 0;
ct->thread_number = get_thread();
printf("Thread_number = %d\n", ct->thread_number);
if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0)
handle_err("pthread");
}
}
int main()
{
create_thread_pool();
/* 1- socket 2-bind 3-listen 4-accept*/
struct ip_args listen_addr1, listen_addr2, control_addr;
listen_addr1.ipaddr = INADDR_ANY;
listen_addr1.port = 2000;
listen_addr2.ipaddr = INADDR_ANY;
listen_addr2.port = 3000;
control_addr.ipaddr = INADDR_LOOPBACK;
control_addr.port = 57000;
pthread_t listen_thread[3];
pthread_create(&listen_thread[0], NULL, start_listen, (void *)&listen_addr1);
pthread_create(&listen_thread[1], NULL, start_listen, (void *)&listen_addr2);
//pthread_create(&listen_thread[2], NULL, start_listen, (void *)&control_addr);
start_listen((void *)&control_addr);
return 0;
}
Edited: @terehpp Sample works fine but It only create 3 process thread, Based on a @Martin James and @terehpp suggestion I should change my design. And about my second issue based on @terehpp hint I should use memset(&buffer,0,sizeof buffer) on a local non-static variable.
If we talk only about pthread_join problem and don't touch design issues:
free(ct);
from handle_client
to start_listen
Something like this
if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0) {
handle_err("pthread");
} else {
pthread_join(process_thread[ct->thread_number], NULL);
// As long as we have threadpool we should reallocate resources for
// process_thread[ct->thread_number]
// because after pthread_join it will be free automaticly
//I just catch this error with coredump, so.....
process_thread[ct->thread_number] = malloc(sizeof(pthread_t));
// But I am not really sure this good solution, I think malloc at this place far far away from }good solution
} }
free(ct);
So, now it works without crash for((i=1;i<100000;i++)) do echo "Hi $i" |nc 127.0.0.1 2000; in multiple terminals
Fixes after segmentation fault comment
I change get_thread() and release_thread() methods, also some changes to the start_listener and handler method:
int empty_thread_sp = MAX_ACCEPT_THREAD - 1;
...
int get_thread()
{
while(1) {//My eyes bleeding, too....
pthread_mutex_lock(&lock);
if (empty_thread_sp >= 0) {
int thread;
printf("ThreadNO_in_Get=%d, index=%d\n",empty_thread[empty_thread_sp], empty_thread_sp);
thread = empty_thread[empty_thread_sp];
empty_thread_sp -= 1;
pthread_mutex_unlock(&lock);
return thread;
}
pthread_mutex_unlock(&lock);
}
}
int release_thread(struct client_thread *ct)
{
pthread_mutex_lock(&lock);
empty_thread_sp += 1;
empty_thread[empty_thread_sp] = ct->thread_number;
printf("ThreadNO_in_R=%d, index=%d\n", empty_thread[empty_thread_sp], empty_thread_sp);
pthread_mutex_unlock(&lock);
return 0;
}
void *handle_client(void *arg)
{
int no;
char buffer[1024];
memset(&buffer,0,sizeof buffer);
struct client_thread *ct = arg;
int n;
n = read(ct->cfd, buffer, 1024);
printf("%s", buffer);
write(ct->cfd, &buffer, strlen(buffer));
close(ct->cfd);
return NULL;
}
void *start_listen(void *args)
{
struct ip_args *listen_addr = args;
unsigned int sfd;
struct client_thread *ct;
struct sockaddr_in my_addr;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
handle_err("socket");
printf("%d\n", sfd);
memset(&my_addr, 0, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_addr.s_addr = htonl(listen_addr->ipaddr);
my_addr.sin_port = htons(listen_addr->port);
struct sockaddr_in peer_addr;
if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
handle_err("bind");
if (listen(sfd, 10) == -1)
handle_err("listen");
while (1)
{
ct= malloc(sizeof *ct);
socklen_t addr_size = sizeof peer_addr;
ct->cfd = accept(sfd, (struct sockaddr *)&peer_addr, &addr_size);
if (ct->cfd == 1)
handle_err("accept");
ct->thread_number = get_thread();
printf("Thread_number = %d\n", ct->thread_number);
if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0){
handle_err("pthread");
} else {
pthread_join(process_thread[ct->thread_number], NULL);
release_thread(ct);
}
free(ct);
}
}