Search code examples
csocketslibevent

How to detect a client gets connected to the server using Libevent


I am writing program that sets up a localhost server and a localhost client using libevent. And I need to know when the client gets connected to the server. Libevent provides six event signals for event callback, which are BEV_EVENT_READING, BEV_EVENT_WRITING, BEV_EVENT_ERROR, BEV_EVENT_TIMEOUT, BEV_EVENT_EOF, and BEV_EVENT_CONNECTED. Basically I wanna the client enter the event loop and keep waiting server to be online. If I simply run client program without server online, I sometimes get both BEV_EVENT_READING and BEV_EVENT_CONNECTED flags. The error for BEV_EVENT_READING is connection refused which makes senses since the server is offline. However, the flag BEV_EVENT_CONNECTED in this case is also set and the program immediately leaves even though I dispatch it. Below is the code piece

address.sin_family = AF_INET;
address.sin_addr.s_addr = inet_addr("127.0.0.1");
address.sin_port = htons(7777);
buffev_ptr = bufferevent_socket_new(_evbase_ptr, -1, BEV_OPT_CLOSE_ON_FREE);

bufferevent_setcb(buffev_ptr, myread, mywrite, mysignal, NULL);
bufferevent_enable(buffev_ptr, EV_READ | EV_WRITE);

bufferevent_socket_connect(buffev_ptr, (sockaddr_pt)&address, sizeof(address));

So for the mysignal callback, what should I exactly write in order to deal with the situation where the client gets connected?

void_t Client::mysignal(buffev_pt bev, short flag, void_pt user_data) {

  // An event occurred during a read operation on the bufferevent.
  if(flag & BEV_EVENT_READING) {
  }

  // An event occurred during a write operation on the bufferevent.
  if(flag & BEV_EVENT_WRITING) {
  }

  // An error occurred during a bufferevent operation.
  if(flag & BEV_EVENT_ERROR) {
  }

  // A timeout expired on the bufferevent.
  if(flag & BEV_EVENT_TIMEOUT) {
  }

  // We got an end-of-line indication on the bufferevent.
  if(flag & BEV_EVENT_EOF) {
  }

  // We finished a requested connection on the bufferevent.
  if(flag & BEV_EVENT_CONNECTED) {
  }
}

The BEV_EVENT_CONNECTED and BEV_EVENT_ERROR might happen simultaneously if I run the client program without server online. I was assuming that BEV_EVENT_CONNECTED will be only set when client get connected to the server but it seems not.


Solution

  • Here's a good example page: http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html

    A few things: bufferevent_socket_connect can fail immediately [Side note: the socket is made non-blocking]. That was not handled correctly. It is possible to get CONNECT and ERROR simultaneously. I've added some things to your code and annotated it [please pardon the gratuitous style cleanup].

    Your code was just processing events without keeping track of any "state" information, so I've also added a "state" struct that you can use to determine how to handle the incoming events as far as determining whether the connection happened and whether it is usable. Far from perfect, but it may help:

    // connection state control
    struct privdata {
        u32 priv_evseen;                    // events previously seen
        struct trace_buffer *priv_trace;    // trace buffer to aid debug (e.g.)
        ...
    };
    
    int
    start_connection(...)
    {
        struct privdata *data;
        int err;
    
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = inet_addr("127.0.0.1");
        address.sin_port = htons(7777);
        buffev_ptr = bufferevent_socket_new(_evbase_ptr, -1, BEV_OPT_CLOSE_ON_FREE);
    
        data = calloc(1,sizeof(struct privdata));
    
        bufferevent_setcb(buffev_ptr, myread, mywrite, mysignal, data);
        bufferevent_enable(buffev_ptr, EV_READ | EV_WRITE);
    
        // NOTE: this can fail immediately
        err = bufferevent_socket_connect(buffev_ptr,
            (sockaddr_pt) &address,sizeof(address));
    
        // NOTE: if it fails above, the connection attempt will _not_ be retried by
        // libevent -- we _must_ tear down and recreate from scratch (i.e. libevent
        // has already released the socket it tried to connect with)
        //
        // we'll have to decide how/when/if we want to reenter this function to
        // try again (e.g. server is offline and we requeue externally to call us
        // [say] every 5 minutes, with message "server offline -- will retry in
        // 5 minutes")
        if (err < 0) {
            err = errno;
    
            event_base_free(_evbase_ptr);
            free(data);
    
            errno = err;
            return -1;
        }
    
        event_base_dispatch(base);
    
        return 0;
    }
    
    void_t
    Client::mysignal(buffev_pt bev, short flag, void_pt user_data)
    {
        struct privdata *priv;
    
        priv = user_data;
    
        // NOTE: it _is_ possible to get an event that has _both_ CONNNECTED and
        // ERROR because when the connection occurs, libevent issues a getsockopt
        // to check for errors
        //
        // we can use our private data to determine if an ERROR occurs at the
        // connection point (e.g. priv_evseen does _not_ have CONNECTED set) or
        // later during normal socket operation
    
        do {
            // wait for connection
            // NOTE: this should also handle other errors like timeout
            if ((priv->priv_evseen & CONNECTED) == 0) {
                switch (flag & (ERROR | CONNECTED)) {
                case CONNECTED: // got clean connection -- puppy is happy!
                    break;
    
                case ERROR: // got error first -- CONNECTED event unlikely
                    break;
    
                case (CONNECTED | ERROR):  // connected with error -- how bad is it?
                    break;
    
                case 0:  // huh? -- the only excuse ...
                    if (flag & TIMEOUT)
                        ...
                    break;
                }
                break;
            }
    
            // events that occur during normal operation ...
    
            // An error occurred during a bufferevent operation.
            if (flag & ERROR) {
            }
    
            // A timeout expired on the bufferevent.
            if (flag & TIMEOUT) {
            }
    
            // We got an end-of-line indication on the bufferevent.
            if (flag & EOF) {
            }
    
            // We finished a requested connection on the bufferevent.
            // NOTE: this should _never_ happen now
            if (flag & CONNECTED) {
            }
    
            // An event occurred during a read operation on the bufferevent.
            if (flag & READING) {
            }
    
            // An event occurred during a write operation on the bufferevent.
            if (flag & WRITING) {
            }
        } while (0);
    
        // remember types we've seen before
        priv->priv_seen |= flag;
    }
    

    Here are some of the relevant libevent sources:

    int
    bufferevent_socket_connect(struct bufferevent *bev, struct sockaddr *sa, int socklen)
    {
        struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    
        evutil_socket_t fd;
        int r = 0;
        int result = -1;
        int ownfd = 0;
    
        _bufferevent_incref_and_lock(bev);
    
        if (!bufev_p)
            goto done;
    
        fd = bufferevent_getfd(bev);
        if (fd < 0) {
            if (!sa)
                goto done;
            fd = socket(sa->sa_family, SOCK_STREAM, 0);
            if (fd < 0)
                goto done;
            if (evutil_make_socket_nonblocking(fd) < 0)
                goto done;
            ownfd = 1;
        }
        if (sa) {
    #ifdef WIN32
            if (bufferevent_async_can_connect(bev)) {
                bufferevent_setfd(bev, fd);
                r = bufferevent_async_connect(bev, fd, sa, socklen);
                if (r < 0)
                    goto freesock;
                bufev_p->connecting = 1;
                result = 0;
                goto done;
            }
            else
    #endif
                r = evutil_socket_connect(&fd, sa, socklen);
            if (r < 0)
                goto freesock;
        }
    #ifdef WIN32
        /* ConnectEx() isn't always around, even when IOCP is enabled. Here, we borrow the socket object's write handler to fall back on a non-blocking connect() when ConnectEx() is unavailable. */
        if (BEV_IS_ASYNC(bev)) {
            event_assign(&bev->ev_write, bev->ev_base, fd, EV_WRITE | EV_PERSIST, bufferevent_writecb, bev);
        }
    #endif
        bufferevent_setfd(bev, fd);
        if (r == 0) {
            if (!be_socket_enable(bev, EV_WRITE)) {
                bufev_p->connecting = 1;
                result = 0;
                goto done;
            }
        }
        else if (r == 1) {
            /* The connect succeeded already. How very BSD of it. */
            result = 0;
            bufev_p->connecting = 1;
            event_active(&bev->ev_write, EV_WRITE, 1);
        }
        else {
            /* The connect failed already.  How very BSD of it. */
            bufev_p->connection_refused = 1;
            bufev_p->connecting = 1;
            result = 0;
            event_active(&bev->ev_write, EV_WRITE, 1);
        }
    
        goto done;
    
      freesock:
        _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
        if (ownfd)
            evutil_closesocket(fd);
        /* do something about the error? */
      done:
        _bufferevent_decref_and_unlock(bev);
        return result;
    }
    
    /* XXX we should use an enum here. */
    /* 2 for connection refused, 1 for connected, 0 for not yet, -1 for error. */
    int
    evutil_socket_connect(evutil_socket_t * fd_ptr, struct sockaddr *sa, int socklen)
    {
        int made_fd = 0;
    
        if (*fd_ptr < 0) {
            if ((*fd_ptr = socket(sa->sa_family, SOCK_STREAM, 0)) < 0)
                goto err;
            made_fd = 1;
            if (evutil_make_socket_nonblocking(*fd_ptr) < 0) {
                goto err;
            }
        }
    
        if (connect(*fd_ptr, sa, socklen) < 0) {
            int e = evutil_socket_geterror(*fd_ptr);
    
            if (EVUTIL_ERR_CONNECT_RETRIABLE(e))
                return 0;
            if (EVUTIL_ERR_CONNECT_REFUSED(e))
                return 2;
            goto err;
        }
        else {
            return 1;
        }
    
      err:
        if (made_fd) {
            evutil_closesocket(*fd_ptr);
            *fd_ptr = -1;
        }
        return -1;
    }
    
    /* Check whether a socket on which we called connect() is done
       connecting. Return 1 for connected, 0 for not yet, -1 for error.  In the
       error case, set the current socket errno to the error that happened during
       the connect operation. */
    int
    evutil_socket_finished_connecting(evutil_socket_t fd)
    {
        int e;
        ev_socklen_t elen = sizeof(e);
    
        if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *) &e, &elen) < 0)
            return -1;
    
        if (e) {
            if (EVUTIL_ERR_CONNECT_RETRIABLE(e))
                return 0;
            EVUTIL_SET_SOCKET_ERROR(e);
            return -1;
        }
    
        return 1;
    }
    
    struct bufferevent *
    bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options)
    {
        struct bufferevent_private *bufev_p;
        struct bufferevent *bufev;
    
    #ifdef WIN32
        if (base && event_base_get_iocp(base))
            return bufferevent_async_new(base, fd, options);
    #endif
    
        if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private))) == NULL)
            return NULL;
    
        if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket, options) < 0) {
            mm_free(bufev_p);
            return NULL;
        }
        bufev = &bufev_p->bev;
        evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
    
        event_assign(&bufev->ev_read, bufev->ev_base, fd, EV_READ | EV_PERSIST, bufferevent_readcb, bufev);
        event_assign(&bufev->ev_write, bufev->ev_base, fd, EV_WRITE | EV_PERSIST, bufferevent_writecb, bufev);
    
        evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
    
        evbuffer_freeze(bufev->input, 0);
        evbuffer_freeze(bufev->output, 1);
    
        return bufev;
    }
    
    static void
    bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
    {
        struct bufferevent *bufev = arg;
        struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        int res = 0;
        short what = BEV_EVENT_WRITING;
        int connected = 0;
        ev_ssize_t atmost = -1;
    
        _bufferevent_incref_and_lock(bufev);
    
        if (event == EV_TIMEOUT) {
            /* Note that we only check for event==EV_TIMEOUT. If event==EV_TIMEOUT|EV_WRITE, we can safely ignore the timeout, since a read has occurred */
            what |= BEV_EVENT_TIMEOUT;
            goto error;
        }
        if (bufev_p->connecting) {
            int c = evutil_socket_finished_connecting(fd);
    
            /* we need to fake the error if the connection was refused immediately - usually connection to localhost on BSD */
            if (bufev_p->connection_refused) {
                bufev_p->connection_refused = 0;
                c = -1;
            }
    
            if (c == 0)
                goto done;
    
            bufev_p->connecting = 0;
            if (c < 0) {
                event_del(&bufev->ev_write);
                event_del(&bufev->ev_read);
                _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
                goto done;
            }
            else {
                connected = 1;
    #ifdef WIN32
                if (BEV_IS_ASYNC(bufev)) {
                    event_del(&bufev->ev_write);
                    bufferevent_async_set_connected(bufev);
                    _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED);
                    goto done;
                }
    #endif
                _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED);
                if (!(bufev->enabled & EV_WRITE) || bufev_p->write_suspended) {
                    event_del(&bufev->ev_write);
                    goto done;
                }
            }
        }
    
        atmost = _bufferevent_get_write_max(bufev_p);
    
        if (bufev_p->write_suspended)
            goto done;
    
        if (evbuffer_get_length(bufev->output)) {
            evbuffer_unfreeze(bufev->output, 1);
            res = evbuffer_write_atmost(bufev->output, fd, atmost);
            evbuffer_freeze(bufev->output, 1);
            if (res == -1) {
                int err = evutil_socket_geterror(fd);
    
                if (EVUTIL_ERR_RW_RETRIABLE(err))
                    goto reschedule;
                what |= BEV_EVENT_ERROR;
            }
            else if (res == 0) {
                /* eof case XXXX Actually, a 0 on write doesn't indicate an EOF. An ECONNRESET might be more typical. */
                what |= BEV_EVENT_EOF;
            }
            if (res <= 0)
                goto error;
    
            _bufferevent_decrement_write_buckets(bufev_p, res);
        }
    
        if (evbuffer_get_length(bufev->output) == 0) {
            event_del(&bufev->ev_write);
        }
    
        /*
         * Invoke the user callback if our buffer is drained or below the
         * low watermark.
         */
        if ((res || !connected) && evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
            _bufferevent_run_writecb(bufev);
        }
    
        goto done;
    
      reschedule:
        if (evbuffer_get_length(bufev->output) == 0) {
            event_del(&bufev->ev_write);
        }
        goto done;
    
      error:
        bufferevent_disable(bufev, EV_WRITE);
        _bufferevent_run_eventcb(bufev, what);
    
      done:
        _bufferevent_decref_and_unlock(bufev);
    }