Search code examples
clinuxsocketsnetwork-programmingepoll

Epoll tcp server grinds to a halt when accepting connections


I'm trying to connect 10,000+ tcp clients to my tcp server below. After 1-5 seconds I'm able to get between 200 and 5000 clients connected before the code grinds to a halt and hangs without terminating. I cant find any further documentation on this and the gprof profiler isnt able to collect any data.

Server:

#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <errno.h>
#include <iostream>

#ifndef MAXEVENTS
#define MAXEVENTS 64
#endif
#ifndef TX_BUF_SIZE
#define TX_BUF_SIZE (65535)
#endif
#ifndef RX_BUF_SIZE
#define RX_BUF_SIZE (65535)
#endif

char buf[RX_BUF_SIZE];

void user_recv_handler(int efd, int fd, char * buf, int len)
{
    int s = -1;
    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = EPOLLOUT | EPOLLET;
    s = epoll_ctl(efd, EPOLL_CTL_MOD, fd, &ev);
    //assert(s!=-1);
    if(s==-1)
    {
        fprintf(stderr, "epoll out error.\n");
        return;
    }
}

struct addrinfo* tcpipv4_getaddrinfo(char* port)
{
    struct addrinfo hints;
    struct addrinfo *res;
    int s;

    bzero(&hints, sizeof(struct addrinfo));
    hints.ai_family = AF_INET; // ipv4 addrs 
    hints.ai_socktype = SOCK_STREAM; // TCP
    hints.ai_flags = AI_PASSIVE;
    s = getaddrinfo(NULL, port, &hints, &res);
    //assert(s==0);
    if (s)
    {
        fprintf(stderr, "failed to getaddrinfo: %s\n", gai_strerror(s));
        return NULL;
    }
    return res;
}


struct addrinfo* tcpipv6_getaddrinfo(char* port)
{
    struct addrinfo hints;
    struct addrinfo *res;
    int s;

    bzero(&hints, sizeof(struct addrinfo));
    hints.ai_family = AF_INET6; // ipv4 addrs 
    hints.ai_socktype = SOCK_STREAM; // TCP
    hints.ai_flags = AI_PASSIVE;
    s = getaddrinfo(NULL, port, &hints, &res);
    //assert(s==0);
    if (s)
    {
        fprintf(stderr, "failed to getaddrinfo-ipv6: %s\n", gai_strerror(s));
        return NULL;
    }
    return res;
}

int set_nonblock(int fd)
{
    int flags = -1;
    if(-1 == (flags = fcntl(fd, F_GETFL, 0)))
    {
        return -1;
    }
    flags |= O_NONBLOCK;
    if( fcntl(fd, F_SETFL, flags) == -1 )
    {
        return -1;
    }
    return 0;
}

int tcpipv4_createfd_bind(struct addrinfo* rp)
{
    int flags = -1;
    int s;
    // create socket 
    int sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
    //assert(sfd!=-1);
    if (sfd == -1) 
    {
        fprintf(stderr, "failed to create socket\n");
        return -1;
    }
    // bind
    s = bind(sfd, rp->ai_addr, rp->ai_addrlen);
    //assert(s==0);
    if(s!=0)
    {
        fprintf(stderr, "failed to bind socket %d\n", sfd);
        return -1;
    }
    // nonblock
    s = set_nonblock(sfd);
    //assert(s != -1);
    if (s == -1)
    {
        fprintf(stderr, "failed to set nonblocking socket %d\n", sfd);
        return -1;
    }
    return sfd;
}

int writen(int fd, char * buf, size_t len)
{
    char * cur = buf;
    int n = -1;
    while(len>0)
    {
        n = write(fd, cur, len);
        if (n<=0)
        {
            if(errno == EINTR) continue;
            else return -1;
        }
        len -= n;
        cur += n;
    }
    return 0;
}

int readn(int fd, char* buf, size_t len)
{
    char *cur = buf;
    int n = -1;
    while (len>0)
    {
        n = read(fd, cur, len);
        if (n == -1)
        {
            if (errno == EINTR)
                continue;
            else break;
        }
        else if (n == 0)
            break;
        cur += n; len -= n;
    }
    return (int)(cur-buf);
}

void accept_handler(int efd, int listenfd)
{
    struct epoll_event event;
    int s;
    while(1)
    {
        struct sockaddr in_addr;
        socklen_t in_addrlen = sizeof(struct sockaddr);
        int infd = -1;
        char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

        infd = accept(listenfd, &in_addr, &in_addrlen);
        //assert(infd != -1);
        if(infd == -1)
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK)
                ;
            else
                perror("failed to accept\n");
            return;
        }
        s = getnameinfo(&in_addr, in_addrlen, 
                hbuf, sizeof(hbuf), 
                sbuf, sizeof(sbuf),
                NI_NUMERICHOST | NI_NUMERICSERV);
        //assert(s == 0);
        if(s == 0)
        {
            printf("Accept fd %d host %s port %s\n", infd, hbuf, sbuf);
            s = set_nonblock(infd);
            //assert(s!=-1);
            event.data.fd = infd;
            event.events = EPOLLIN | EPOLLET;
            s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event);
            //assert(s != -1);
            return;
        }
    }
    return;
}

void read_handler(int efd, int fd)
{
    //do sonething with buf.
    int s = -1;
    s=readn(fd, buf, sizeof(buf));
    buf[s] = 0;
    //printf("recv %d bytes: %s", s, buf);
    if(s < 0)
    {
        close(fd);
        if(-1 == epoll_ctl(efd, EPOLL_CTL_DEL, fd, NULL) )
            fprintf(stderr, "failed to del event of %d\n", fd);
        printf("close conection on fd %d", fd);
    }
    else if(s > 0)
    {
        //std::cout << buf << std::endl;
        //do sonething with buf.
        user_recv_handler(efd, fd, buf, s);
    }
}


void write_handler(int efd, int fd)
{
    writen(fd, buf, strlen(buf));
    if(-1 == epoll_ctl(efd, EPOLL_CTL_DEL, fd, NULL) )
        fprintf(stderr, "failed to del event of %d\n", fd);
//  close(fd);
}

int main(int argc, char ** argv)
{
    char* port = NULL;
    int listenfd = -1;
    struct addrinfo* hostaddr=NULL; 
    struct addrinfo* rp = NULL;
    struct epoll_event event;
    struct epoll_event * events, *cur_ev;
    int efd = -1;
    int num_ev = -1;
    int s;

    port = argv[1];
    // get server ipv4 address by getaddrinfo
    (rp = hostaddr = tcpipv4_getaddrinfo(port));
    // create and bind listening socket
    for(; rp; rp = rp->ai_next)
    {
        (listenfd = tcpipv4_createfd_bind(rp));
        if(-1 == listenfd)
            continue;
    }
    freeaddrinfo(hostaddr); 
    //assert(listenfd!=-1);
    if(listenfd==-1)
        exit(EXIT_FAILURE);
    //start listening 
    (s = listen(listenfd, SOMAXCONN));
    //assert(s!=-1);
    if(s == -1)
        exit(EXIT_FAILURE);
    // create epoll
    efd = epoll_create(MAXEVENTS);
    //assert(efd != -1);
    if(efd == -1)
        exit(EXIT_FAILURE);

    event.data.fd = listenfd;
    // epoll: read, ET
    event.events = EPOLLIN | EPOLLET;
    s = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &event);
    //assert(s!=-1);
    if(s==-1)
        exit(EXIT_FAILURE);

    events = (struct epoll_event*)calloc(MAXEVENTS, sizeof(struct epoll_event));

    // event loop;
    while (1)
    {
        num_ev = epoll_wait(efd, events, MAXEVENTS, -1);
        // for each active event: 
        while(num_ev--)
        {
            cur_ev = events+num_ev;
            // close the fd if error (ERR) or hang up (HUP)
            if(cur_ev->events & EPOLLERR || 
                cur_ev->events & EPOLLHUP)
            {
                fprintf(stderr, "epoll get event error\n");
                close(cur_ev->data.fd);
                continue;
            }
            // one or more new connections (fd = listenfd)
            else if(cur_ev->data.fd == listenfd)
            {
                accept_handler(efd, listenfd);
                continue;
            }
            else if(cur_ev->events & EPOLLIN)
            {
                // since the registered event is EPOLLIN, 
                // here we have data on fd waiting for reading.     
                read_handler(efd, cur_ev->data.fd);
            }
            else if (cur_ev->events & EPOLLOUT)
            {
                write_handler(efd, cur_ev->data.fd);
            }
        }
    }
    free(events); events = NULL;
    close(listenfd);
    exit(EXIT_SUCCESS);
}

Client:

int connected_count=0;
int i=0;
struct timespec tstart={0,0}, tend={0,0};
clock_gettime(CLOCK_MONOTONIC, &tstart);
for(; i!=10000; i++)
{
    int sockfd;
    int portno = 4000;
    ssize_t n;
    struct sockaddr_in serveraddr;
    struct hostent* server;
    char hostname[] = "127.0.0.1";
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0)
    {
        printf("ERROR opening socket");
        printf("error %d",errno);
        test_function_killall(NULL);
        return;
    }
    server = gethostbyname(hostname);
    if(server == NULL)
    {
        fprintf(stderr,"ERROR, no such host as %s\n", hostname);
        test_function_killall(NULL);
        return;
    }
    bzero((char*)&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    bcopy((char*)server->h_addr, (char*)&serveraddr.sin_addr.s_addr, server->h_length);
    serveraddr.sin_port = htons(portno);
    if(connect(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)) < 0) 
    {
        printf("ERROR connecting");
        test_function_killall(NULL);
        return;
    }
    else
    {
        std::cout << "active connections " << connected_count++ << std::endl;
    }
    set_nonblock(sockfd);
}   
if(connected_count==10000)
{       
    printf("complete");
}

Solution

  • Start with this:

    • Remove the EPOLLET flags from the listen socket registration. You might want to remove it from the client connection sockets too.

    • Set your listen socket to non-blocking similar to how the client connection sockets returned from accept are set.

    There's all kinds of edge cases with the listen sockets. I'm not sure, but it appears you aren't fully draining the accept queue when epoll_ctl comes back to indicate that the listen socket has a connection ready. (Think: multiple incoming connections on one edge trigger). It's possible you are blocking on accept and/or stuck on epoll_wait.

    Update:

    • Another thing you could be hitting are the system limits for the maximum number of files handles per process. Read up here.