Search code examples
c++cmultithreadingsocketsrecvfrom

Recvfrom working locally, but don't receive anything on random ports between executions


I'm working on a multithreaded UDP listener and I'm stuck in a problem that definitely surpasses me.

So, I'm required to receive huge amounts of UDP packets in several ports. Locally, the best solution for me was to call non blocking recvfrom in as much threads as ports I'm listening (select and poll were too slow for my requirements). I'm using a thread pool manager, it simply calls on threads and queues tasks. Here's the code:

void receiveFromSocket(void * arguments){

    sockaddr_in client;  // Local
    socklen_t clientSize = sizeof(client);
    memset(&client, 0, sizeof(client));
    struct arg_struct_listenPort *args2 = (struct arg_struct_listenPort *)arguments;
        int fd = args2->arg_fd;
        int port = args2->arg_port;

    for(;;) {
        char buf[158];
        memset(buf,0,158*sizeof(char));
        int n = recvfrom(fd, (char * ) buf, 158, MSG_DONTWAIT, ( struct sockaddr *) &client, &clientSize);

            if(n == -1){
               //cerr << "Error while receiving from client: " << errno << endl;
               continue;
            }

            if(n != 158){
               cerr << "Discarded message since it's not 158 bytes." << endl;
               continue;
            }
            struct arg_struct args;
                args.arg_port = port;
                memcpy(args.buf,buf,158);

            thpool_add_work(globals.thpool, socketThread, (void*)(&args));

    }

}


/// Runs the Socket listener
int network_accept_any()
{
        vector<int>::iterator i;
        for(i = globals.fds.begin(); i != globals.fds.end(); i++){
            int port = distance(globals.fds.begin(),i);
            struct arg_struct_listenPort args;
                args.arg_fd = *i;
                args.arg_port = globals.cmnSystemCatalogs[port].diag_port;
            thpool_add_work(globals.thpool, receiveFromSocket, (void*)(&args));
        }
        cout << "Listening threads created..." << endl;

    return 0;
}

This works perfectly fine locally. But when I compile it on a production environment, some ports listen the packets and other's simply don't! And the working ports change in each execution. I can , confirm that it is not a firewall problem. I also can clearly see the packets through Wireshark. I can receive packets on those ports through netcat. Netstat shows all ports open.

My local environment is an Ubuntu 18.04 VM, and the production environment is a Debian 9.8.

Here's how I call the sockets:

int lSocket(int port) {

    //Crear Socket
        int listening = socket(AF_INET, SOCK_DGRAM, 0);
        if (listening == -1) {
            cerr << "No se puede crear el socket";
            exit(EXIT_FAILURE);
        }

        //Enlazar socket a un IP / puerto

        struct sockaddr_in hint;
        memset(&hint, 0, sizeof(hint));
        hint.sin_family = AF_INET; //IPv4
        hint.sin_port = htons(port); //Port
        hint.sin_addr.s_addr = htonl(INADDR_ANY);

        if(bind(listening, (struct sockaddr*)&hint, sizeof(hint)) == -1) { //Enlaza las opciones definidas al socket
            cerr << "No se puede enlazar IP/puerto" << endl;
            exit(EXIT_FAILURE);
        }


        return listening;

}

Any advise is greatly appreciated!

EDIT:

As suggested, I tried switching to blocking I/O, but the main issue remains. Still not receiving at all the opened ports.


Solution

  • What an amazing welcome!

    @molbdnilo was absolutely right:

    You're using pointers to objects whose lifetime has ended (&args). This has undefined behaviour - it might appear to work, but it's a bug that needs a-fixin'.

    Here's the fixed code. Gotta be careful when feeding arguments to threads!

    void receiveFromSocket(void * arguments){
    
        sockaddr_in client;  // Local
        socklen_t clientSize = sizeof(client);
        memset(&client, 0, sizeof(client));
        struct arg_struct_listenPort *args2 = (struct arg_struct_listenPort *)arguments;
            int fd = args2->arg_fd;
            int port = args2->arg_port;
    
        for(;;) {
            char buf[158];
            memset(buf,0,158*sizeof(char));
            int n = recvfrom(fd, (char * ) buf, 158, MSG_WAITALL, ( struct sockaddr *) &client, &clientSize);
    
                if(n == -1){
                   cerr << "Error while receiving from client: " << errno << endl;
                   continue;
                }
    
                if(n != 158){
                   cerr << "Discarded message since it's not 158 bytes." << endl;
                   continue;
                }
                arg_struct *args = new arg_struct;
                    args->arg_port = port;
                    memcpy(args->buf,buf,158);
    
                thpool_add_work(globals.thpool, socketThread, (void*)(args));
    
        }
    
    }
    
    
    /// Runs the Socket listener
    int network_accept_any()
    {
            vector<int>::iterator i;
            for(i = globals.fds.begin(); i != globals.fds.end(); i++){
                int port = distance(globals.fds.begin(),i);
              arg_struct_listenPort *args = new arg_struct_listenPort;
                    args->arg_fd = *i;
                    args->arg_port = globals.cmnSystemCatalogs[port].diag_port;
                thpool_add_work(globals.thpool, receiveFromSocket, (void*)(args));
            }
            cout << "Listening threads created..." << endl;
    
    
        return 0;
    }
    

    Also, I'll keep an eye on @John Bollinger 's and @Superlokkus comments.

    Thank you all!