Search code examples
csocketsnetwork-programmingsigpipeepipe

Broken pipe (EPIPE) on connection to loopback address


I'm currently testing my networking code. This involves making a connection via the IPv4 loopback address (127.0.0.1). Unfortunately the program often (not always) gives an EPIPE error on sending data.

I am using Berkeley network sockets and libevent. I make a non-blocking socket via:

CBSocketReturn CBNewSocket(uint64_t * socketID,bool IPv6){
    *socketID = socket(IPv6 ? PF_INET6 : PF_INET, SOCK_STREAM, 0);
    if (*socketID == -1) {
        if (errno == EAFNOSUPPORT || errno == EPROTONOSUPPORT) {
            return CB_SOCKET_NO_SUPPORT;
        }
        return CB_SOCKET_BAD;
    }
    // Stop SIGPIPE annoying us.
    if (CB_NOSIGPIPE) {
        int i = 1;
        setsockopt(*socketID, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof(i));
    }
    // Make socket non-blocking
    evutil_make_socket_nonblocking((evutil_socket_t)*socketID);
    return CB_SOCKET_OK;
}

I make a connection event via:

bool CBSocketDidConnectEvent(uint64_t * eventID,uint64_t loopID,uint64_t socketID,void (*onDidConnect)(void *,void *),void * node){
    CBEvent * event = malloc(sizeof(*event));
    event->loop = (CBEventLoop *)loopID;
    event->onEvent.ptr = onDidConnect;
    event->node = node;
    event->event = event_new(((CBEventLoop *)loopID)->base, (evutil_socket_t)socketID, EV_TIMEOUT|EV_WRITE, CBDidConnect, event);
    if (NOT event->event) {
        free(event);
        event = 0;
    }
    *eventID = (uint64_t)event;
    return event;
}
void CBDidConnect(evutil_socket_t socketID,short eventNum,void * arg){
    CBEvent * event = arg;
    if (eventNum & EV_TIMEOUT) {
        // Timeout for the connection
        event->loop->onTimeOut(event->loop->communicator,event->node,CB_TIMEOUT_CONNECT);
    }else{
        // Connection successful
        event->onEvent.ptr(event->loop->communicator,event->node);
    }
}

And add it via:

bool CBSocketAddEvent(uint64_t eventID,uint16_t timeout){
    CBEvent * event = (CBEvent *)eventID;
    int res;
    if (timeout) {
        struct timeval time = {timeout,0};
        res = event_add(event->event, &time);
    }else
        res = event_add(event->event, NULL);
    return NOT res;
}

To connect:

bool CBSocketConnect(uint64_t socketID,uint8_t * IP,bool IPv6,uint16_t port){
    // Create sockaddr_in6 information for a IPv6 address
    int res;
    if (IPv6) {
        struct sockaddr_in6 address;
        memset(&address, 0, sizeof(address)); // Clear structure.
        address.sin6_family = AF_INET6;
        memcpy(&address.sin6_addr, IP, 16); // Move IP address into place.
        address.sin6_port = htons(port); // Port number to network order
        res = connect((evutil_socket_t)socketID, (struct sockaddr *)&address, sizeof(address));
    }else{
        struct sockaddr_in address;
        memset(&address, 0, sizeof(address)); // Clear structure.
        address.sin_family = AF_INET;
        memcpy(&address.sin_addr, IP + 12, 4); // Move IP address into place. Last 4 bytes for IPv4.
        address.sin_port = htons(port); // Port number to network order
        res = connect((evutil_socket_t)socketID, (struct sockaddr *)&address, sizeof(address));
    }
    if (NOT res || errno == EINPROGRESS)
        return true;
    return false;
}

Upon connection the canSend event is made:

bool CBSocketCanSendEvent(uint64_t * eventID,uint64_t loopID,uint64_t socketID,void (*onCanSend)(void *,void *),void * node){
    CBEvent * event = malloc(sizeof(*event));
    event->loop = (CBEventLoop *)loopID;
    event->onEvent.ptr = onCanSend;
    event->node = node;
    event->event = event_new(((CBEventLoop *)loopID)->base, (evutil_socket_t)socketID, EV_TIMEOUT|EV_WRITE|EV_PERSIST, CBCanSend, event);
    if (NOT event->event) {
        free(event);
        event = 0;
    }
    *eventID = (uint64_t)event;
    return event;
}
void CBCanSend(evutil_socket_t socketID,short eventNum,void * arg){
    CBEvent * event = arg;
    if (eventNum & EV_TIMEOUT) {
        // Timeout when waiting to write.
        event->loop->onTimeOut(event->loop->communicator,event->node,CB_TIMEOUT_SEND);
    }else{
        // Can send
        event->onEvent.ptr(event->loop->communicator,event->node);
    }
}

But sending often gives an EPIPE error:

int32_t CBSocketSend(uint64_t socketID,uint8_t * data,uint32_t len){
    ssize_t res = send((evutil_socket_t)socketID, data, len, CB_SEND_FLAGS);
    printf("SENT (%li): ",res);
    for (uint32_t x = 0; x < res; x++) {
        printf("%c",data[x]);
    }
    printf("\n");
    if (res >= 0)
        return (int32_t)res;
    if (errno == EAGAIN)
        return 0; // False event. Wait again.
    return CB_SOCKET_FAILURE; // Failure
}

It lands on return CB_SOCKET_FAILURE; and errno is set to EPIPE. Now why would this be? The send flags is just MSG_NOSIGNAL if it is set because SIGPIPE kept interrupting the program with this error. I want EPIPE to cause CBSocketSend to return CB_SOCKET_FAILURE and not interrupt the program, but there is not reason for the send to fail with EPIPE, so why is it doing it?

Last time I got the error I noticed the thread that connects was still on the connect() call. Is there danger in making a connection event to be handled by a separate thread than the thread that connects?

See the network code in these places:

https://github.com/MatthewLM/cbitcoin/blob/master/test/testCBNetworkCommunicator.c https://github.com/MatthewLM/cbitcoin/tree/master/src/structures/CBObject/CBNetworkCommunicator https://github.com/MatthewLM/cbitcoin/tree/master/dependencies/sockets

Thank you.

Edit: I ran it again and I got the error after connect() had finished.

EDIT 2: It seems the connection event is being given without an accept from the other side.


Solution

  • Below is a simple libevent toy program that synthesizes EINPROGRESS, and then waits for the connection to complete by waiting for EV_WRITE. Basically, this program shows that in your application, you should attempt to do the connect call first, and if it fails with EINPROGRESS, you should wait for completion before performing I/O.

    This is the libevent callback function:

    extern "C" void on_connect (int sock, short ev, void *arg) {
        assert(ev == EV_WRITE);
        std::cout << "got wrieable on: " << sock << '\n';
        int optval = -1;
        socklen_t optlen = sizeof(optval);
        getsockopt(sock, SOL_SOCKET, SO_ERROR, &optval, &optlen);
        assert(optval == 0);
        std::cout << "succesful asynchronous connect on: " << sock << '\n';
        event_loopbreak();
    }
    

    These are some helper functions used by the toy application:

    static void init_addr (struct sockaddr_in *addr, short port) {
        memset(addr, '\0', sizeof(*addr));
        addr->sin_family = AF_INET;
        addr->sin_port = htons(port);
        addr->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    }
    
    static void setup_accept (int sock) {
        const int one = 1;
        struct sockaddr_in addr;
        init_addr(&addr, 9876);
        setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
        bind(sock, (struct sockaddr *)&addr, sizeof(addr));
        listen(sock, 1);
    }
    
    static int complete_accept (int sock) {
        struct sockaddr_in addr;
        socklen_t addrlen = sizeof(addr);
        return accept(sock, (struct sockaddr *)&addr, &addrlen);
    }
    
    static int try_connect (int sock) {
        struct sockaddr_in addr;
        init_addr(&addr, 9876);
        return connect(sock, (struct sockaddr *)&addr, sizeof(addr));
    }
    

    And the main program is below:

    int main () {
        int accept_sock = socket(PF_INET, SOCK_STREAM, 0);
        setup_accept(accept_sock);
        int sock = socket(PF_INET, SOCK_STREAM, 0);
        fcntl(sock, F_SETFL, fcntl(sock, F_GETFL) | O_NONBLOCK);
        std::cout << "trying first connect on: " << sock << '\n';
        int r = try_connect(sock);
        assert(r < 0 && errno == EINPROGRESS);
        event_init();
        struct event ev_connect;
        event_set(&ev_connect, sock, EV_WRITE, on_connect, 0);
        event_add(&ev_connect, 0);
        int new_sock = complete_accept(accept_sock);
        event_dispatch();
        return 0;
    }