Search code examples
c++socketsgcclibev

libev more then one watcher per fd fails


I can't seem to get working 2 watchers per socket.. Code below doesn't acually works at all, but if i mix up these calls(for ex. call init/set/start for 1 watcher and then for other), i get only 1 watcher working.. Is there something I'm missing badly here...? I don't think it has anything to do with loops and setup... I have 1 accept loop(default loop) and 1 loop for accepted connections. I tried both, running code below directly after accepting connection on accept loop and via ev_async_send(...) then executing this code from other io loop. Results were same. Also setting both events on 1 watcher works fine too.

Thank you

ev_init (pSockWatcher->_wW, &CNetServer::send_cb);
ev_init (pSockWatcher->_wR, &CNetServer::recv_cb);

ev_io_set (pSockWatcher->_wW, pSockWatcher->_sd, EV_WRITE );
ev_io_set (pSockWatcher->_wR, pSockWatcher->_sd, EV_READ );


ev_io_start (loop, pSockWatcher->_wR);
ev_io_start (loop, pSockWatcher->_wW);

Solution

  • Well, here is an example with two I/O watchers on one socket fd, which seems to work fine for me. I am using the ev_io_init() function, however, not the ev_init() and ev_set().

    #include <ev.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    
    static struct ev_loop *loop;
    static ev_timer timeout_watcher;
    static ev_io in_watcher, out_watcher;
    static ev_idle idle_watcher;
    static int sock_fd;
    
    // socket input watcher
    static void in_cb(EV_P_ ev_io *watcher, int revents) {
        int r, t;
        char buf[1024];
        for (t = 0; (r = read(sock_fd, buf, sizeof(buf))) > 0;) {
            t += r;
            write(STDOUT_FILENO, buf, r); // copy input to stdout
            if (buf[r-1] == '\n') break; // operate line-at-a-time
        }
        fprintf(stderr, "in: count = %d\n", t);
        if (r == 0) {
            fputs("in: connection closed\n", stderr);
            ev_io_stop(loop, &in_watcher); // stop the socket watcher
            ev_break(loop, EVBREAK_ALL); // exit the loop
        } else if (r < 0) {
            perror("read");
        }
    }
    
    // socket output watcher
    static void out_cb(EV_P_ ev_io *watcher, int revents) {
        int r, t, lim;
        char buf[1024];
        ev_io_stop(loop, &out_watcher);
        for (t = 0; t < sizeof(buf); t++) {
            buf[t] = 'a' + (rand() % 26);
        }
        for (t = 0, lim = rand() % 10000 + 1000;
                (r = write(sock_fd, buf, (lim - t > sizeof(buf)) ? sizeof(buf) : lim - t)) > 0;) {
            t += r;
            if (t >= lim) break;
        }
        if (r < 0) {
            perror("write");
        }
        fprintf(stderr, "out: finished sending, count = %d\n", t);
    }
    
    static void timeout_cb(EV_P_ ev_timer *watcher, int revents) {
        fprintf(stderr, "timeout: now = %f\n", ev_now(loop));
        // send a bunch of stuff on the socket when able
        ev_io_start (loop, &out_watcher);
    }
    
    static void idle_cb(EV_P_ ev_idle *watcher, int revents) {
        static long idle_count = 0;
        fprintf(stderr, "idle: count = %ld\n", ++idle_count);
        sleep(1); // simulate doing stuff
    }
    
    int main() {
    
        extern int errno;
    
        int master_fd;
        int sock_opt = 1;
        int conn_port = 7000;
        struct sockaddr_in addr;
        socklen_t addrlen;
    
        // **** the following is needed to set up a socket to receive data ****
        master_fd = socket(AF_INET, SOCK_STREAM, 0);
        if (master_fd == -1) {
            perror("socket");
            return errno;
        }
        if (setsockopt(master_fd, SOL_SOCKET, SO_REUSEADDR, (char *) &sock_opt, sizeof(sock_opt)) == -1) {
            perror("setsockopt");
            return errno;
        }
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = INADDR_ANY;
        addr.sin_port = htons(conn_port);
        addrlen = sizeof(addr);
        if (bind(master_fd, (struct sockaddr *) &addr, addrlen) != 0) {
            perror("bind");
            return errno;
        }
        if (listen(master_fd, 3) != 0) {
            perror("listen");
            return errno;
        }
    
        fprintf(stderr, "awaiting a connection on port %d\n", conn_port);
        sock_fd = accept(master_fd, (struct sockaddr *) &addr, &addrlen);
        if (sock_fd == -1) {
            perror("accept");
            return errno;
        }
        fputs("in: connection established\n", stderr);
        // **** end of socket setup code ****
    
        // define a loop
        loop = ev_default_loop(0);
    
        // define a repeating timer
        ev_timer_init (&timeout_watcher, timeout_cb, 5.0, 5.0);
        ev_timer_start (loop, &timeout_watcher);
    
        // define an idle process
        ev_idle_init(&idle_watcher, idle_cb);
        ev_idle_start (loop, &idle_watcher);
    
        // define the socket data receiver
        ev_io_init(&in_watcher, in_cb, sock_fd, EV_READ);
        ev_io_start (loop, &in_watcher);
    
        // define the socket data write complete watcher
        ev_io_init(&out_watcher, out_cb, sock_fd, EV_WRITE);
    
        // run the loop
        ev_run(loop, 0);
    
        // clean up
        close(sock_fd);
        close(master_fd);
        return 0;
    }