Search code examples
event-handlingepollepollet

Is rearming file descriptors for epoll thread safe?


From this question I know that I can call epoll_ctl(2) while another thread is blocking on epoll_wait(2). I still have a question though.

When using epoll with the EPOLLONESHOT flag only one event is fired and the fd has to be rearmed using epoll_ctl(2). This is necessary so only one thread will read from the fd and handle the result appropriately.

The following is a timeline that somewhat visualizes my supposed problem:

Thread1:                       Thread2:                  Kernel:
-----------------------------------------------------------------------
epoll_wait();
                                                         Receives chunk
dispatch chunk to thread 2
epoll_wait();                  Handle chunk
                               Still handle chunk        Receives chunk
                               Rearm fd for epoll
?

What happens on the question mark when the fd is rearmed after a chunk is received? Will epoll fire an EPOLLIN event, or will it block indefinitely although the socket is readable? Is my architecture at all sensible?


Solution

  • Your architecture is sensible, and it will work: epoll will mark the file descriptor as readable and fire an EPOLLIN event.

    The documentation on this is scarce and subtle; the Q/A section of man 7 epoll briefly mentions this:

    Q8 Does an operation on a file descriptor affect the already collected but not yet reported events?

    A8 You can do two operations on an existing file descriptor. Remove would be meaningless for this case. Modify will reread available I/O.

    The two operations that you can do on an existing file descriptor (an existing file descriptor is a file descriptor that has been added to the epoll set in the past - this includes file descriptors that are waiting to be rearmed) are delete and modify. As the manpage mentions, delete is meaningless here, and modify will re-evaluate the conditions in the file descriptor.

    Nothing beats a real world experiment though. The following program tests this edge case:

    #include <stdio.h>
    #include <pthread.h>
    #include <signal.h>
    #include <stdlib.h>
    #include <assert.h>
    #include <semaphore.h>
    #include <sys/epoll.h>
    #include <unistd.h>
    
    static pthread_t tids[2];
    static int epoll_fd;
    static char input_buff[512];
    static sem_t chunks_sem;
    
    void *dispatcher(void *arg) {
        struct epoll_event epevent;
    
        while (1) {
            printf("Dispatcher waiting for more chunks\n");
            if (epoll_wait(epoll_fd, &epevent, 1, -1) < 0) {
                perror("epoll_wait(2) error");
                exit(EXIT_FAILURE);
            }
    
            ssize_t n;
            if ((n = read(STDIN_FILENO, input_buff, sizeof(input_buff)-1)) <= 0) {
                if (n < 0)
                    perror("read(2) error");
                else
                    fprintf(stderr, "stdin closed prematurely\n");
                exit(EXIT_FAILURE);
            }
    
            input_buff[n] = '\0';
            sem_post(&chunks_sem);
        }
    
        return NULL;
    }
    
    void *consumer(void *arg) {
        sigset_t smask;
        sigemptyset(&smask);
        sigaddset(&smask, SIGUSR1);
    
        while (1) {
            sem_wait(&chunks_sem);
            printf("Consumer received chunk: %s", input_buff);
            /* Simulate some processing... */
            sleep(2);
            printf("Consumer finished processing chunk.\n");
            printf("Please send SIGUSR1 after sending more data to stdin\n");
    
            int signo;
            if (sigwait(&smask, &signo) < 0) {
                perror("sigwait(3) error");
                exit(EXIT_FAILURE);
            }
    
            assert(signo == SIGUSR1);
    
            struct epoll_event epevent;
            epevent.events = EPOLLIN | EPOLLONESHOT;
            epevent.data.fd = STDIN_FILENO;
    
            if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, STDIN_FILENO, &epevent) < 0) {
                perror("epoll_ctl(2) error when attempting to readd stdin");
                exit(EXIT_FAILURE);
            }
    
            printf("Readded stdin to epoll fd\n");
        }
    }
    
    int main(void) {
    
        sigset_t sigmask;
        sigfillset(&sigmask);
        if (pthread_sigmask(SIG_SETMASK, &sigmask, NULL) < 0) {
            perror("pthread_sigmask(3) error");
            exit(EXIT_FAILURE);
        }
    
        if ((epoll_fd = epoll_create(1)) < 0) {
            perror("epoll_create(2) error");
            exit(EXIT_FAILURE);
        }
    
        struct epoll_event epevent;
        epevent.events = EPOLLIN | EPOLLONESHOT;
        epevent.data.fd = STDIN_FILENO;
    
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &epevent) < 0) {
            perror("epoll_ctl(2) error");
            exit(EXIT_FAILURE);
        }
    
        if (sem_init(&chunks_sem, 0, 0) < 0) {
            perror("sem_init(3) error");
            exit(EXIT_FAILURE);
        }
    
        if (pthread_create(&tids[0], NULL, dispatcher, NULL) < 0) {
            perror("pthread_create(3) error on dispatcher");
            exit(EXIT_FAILURE);
        }
    
        if (pthread_create(&tids[1], NULL, consumer, NULL) < 0) {
            perror("pthread_create(3) error on consumer");
            exit(EXIT_FAILURE);
        }
    
        size_t i;
        for (i = 0; i < sizeof(tids)/sizeof(tids[0]); i++) {
            if (pthread_join(tids[i], NULL) < 0) {
                perror("pthread_join(3) error");
                exit(EXIT_FAILURE);
            }
        }
    
        return 0;
    }
    

    It works as follows: the dispatcher thread adds stdin to an epoll set and then uses epoll_wait(2) to fetch input from stdin whenever it becomes readable. When input arrives, the dispatcher wakes up the worker thread, who prints the input and simulates some processing time by sleeping 2 seconds. In the meantime, the dispatcher goes back to the main loop and blocks in epoll_wait(2) again.

    The worker thread won't rearm stdin until you tell it to by sending it SIGUSR1. So, we just write some more stuff into stdin, and then send SIGUSR1 to the process. The worker thread receives the signal, and only then it rearms stdin - which is already readable by that time, and the dispatcher was already waiting on epoll_wait(2).

    You can see from the output that the dispatcher is correctly awaken and everything works like a charm:

    Dispatcher waiting for more chunks
    testing 1 2 3 // Input
    Dispatcher waiting for more chunks // Dispatcher notified worker and is waiting again
    Consumer received chunk: testing 1 2 3
    Consumer finished processing chunk.
    Please send SIGUSR1 after sending more data to stdin
    hello world // Input
    Readded stdin to epoll fd // Rearm stdin; dispatcher is already waiting
    Dispatcher waiting for more chunks // Dispatcher saw new input and is now waiting again
    Consumer received chunk: hello world
    Consumer finished processing chunk.
    Please send SIGUSR1 after sending more data to stdin