Search code examples
c++multithreadingtcpnetwork-programming

Recv() returning SOCKET_ERROR when I connect a client to the server instead of blocking and waiting for message


I am relatively new to network programming and multithreading in C++. Currently my recv() call returns an unknown error. I'm not quite sure where the error coming from at the moment and would appreciate some help.

I used putty to connect to the server locally

class Threaded_TCPListener{

    int Threaded_TCPListener::Init()
    {
        // Initializing WinSock
        WSADATA wsData;
        WORD ver = MAKEWORD(2,2);

        int winSock = WSAStartup(ver, &wsData);
        if(winSock != 0)
            return winSock;

        // Creating listening socket
        this->socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

        if(this->socket == INVALID_SOCKET)
            return WSAGetLastError();

        // Fill sockaddr with ip addr and port
        sockaddr_in hint;
        hint.sin_family = AF_INET;
        hint.sin_port = htons(this->port);
        inet_pton(AF_INET, this->ipAddress, &hint.sin_addr);

        // Bind hint to socket
        if(bind(this->socket, (sockaddr*)&hint, sizeof(hint)) == SOCKET_ERROR)
            return WSAGetLastError();

        // Start listening on socket
        if(listen(this->socket, SOMAXCONN) == SOCKET_ERROR)
            return WSAGetLastError();

        // Accept first client
        this->createAcceptThread();

        return 0;
    }

    int Threaded_TCPListener::Run()
    {
        bool isRunning = true;

        // Read from all clients
        std::vector<std::thread> threads;
        threads.reserve(this->clients.size());

        // Recv from client sockets
        for (int i=0; i < this->clients.size(); ++i)
        {
            threads.emplace_back(std::thread(&Threaded_TCPListener::receiveFromSocket, this, socket));
        }

        // Wait for all threads to finish
        for(std::thread& t : threads)
        {
            t.detach();
        }

        return 0;
    }

    void Threaded_TCPListener::onMessageReceived(int clientSocket, const char* msg, int length)
    {
        Threaded_TCPListener::broadcastToClients(clientSocket, msg, length);

        std::thread t(&Threaded_TCPListener::receiveFromSocket, this, clientSocket);

        t.detach();

        return;
    }

    void Threaded_TCPListener::sendMessageToClient(int clientSocket, const char * msg, int length)
    {
        send(clientSocket, msg, length, 0);

        return;
    }

    void Threaded_TCPListener::broadcastToClients(int senderSocket, const char * msg, int length)
    {
        std::vector<std::thread> threads;
        threads.reserve(clients.size());

        // Iterate over all clients
        for (int sendSock : this->clients)
        {
            if(sendSock != senderSocket)
                threads.emplace_back(std::thread(&Threaded_TCPListener::sendMessageToClient, this,sendSock, msg, length));
        }

        // Wait for all threads to finish
        for(std::thread& t : threads)
            t.join();

        return;
    }

    void Threaded_TCPListener::createAcceptThread()
    {
        // Start accepting clients on a new thread
        this->listeningThread = std::thread(&Threaded_TCPListener::acceptClient, this);

        this->listeningThread.detach();

        return;
    }

    void Threaded_TCPListener::acceptClient()
    {
        int client = accept(this->socket, nullptr, nullptr);

        // Error
        if(client == INVALID_SOCKET)
        {
            std::printf("Accept Err: %d\n", WSAGetLastError());
        }
        // Add client to clients queue
        else
        {
            // Add client to queue
            this->clients.emplace(client);

            // Client Connect Confirmation
            onClientConnected(client); // Prints msg on server

            // Create another thread to accept more clients
            this->createAcceptThread();
        }

        return;
    }

    void Threaded_TCPListener::receiveFromSocket(int receivingSocket)
    {
        // Byte storage
        char buff[MAX_BUFF_SIZE];

        // Clear buff
        memset(buff, 0, sizeof(buff));

        // Receive msg
        int bytesRecvd = recv(receivingSocket, buff, MAX_BUFF_SIZE, 0);
        if(bytesRecvd <= 0)
        {
            char err_buff[1024];
            strerror_s(err_buff, bytesRecvd);

            std::cerr << err_buff;
            // Close client
            this->clients.erase(receivingSocket);
            closesocket(receivingSocket);

            onClientDisconnected(receivingSocket); // Prints msg on server
        }
        else
        {
            onMessageReceived(receivingSocket, buff, bytesRecvd);
        }
    }
}

I am trying to create a multithreaded TCP 'server' that will handle incoming clients by having an accept thread continuously running (listening for new connections), and a thread waiting with a recv block for each client connected to the server.


Solution

    1. Your Init looks fine:

      • create socket, bind it, listen on it, start accept thread
    2. In your accept thread's acceptClient looks sort of OK:

      • print some message
      • add the client socket to clients queue
      • create a new accept thread
    3. Your Run makes no sense:

      • create one thread per element in clients to receive from the listening socket

    It looks like you are spawning a new thread for every single socket action. That is a pretty wasteful design. As soon as the thread is done it can go back to doing something else.

    So creating a new accept thread in acceptClient is a waste, you could just loop back to the beginning to ::accept the next client. Like so:

    acceptClient() {
      while (alive) {
        int client = accept(socket, ...);
        createClientHandler(client);
      }
    }
    

    What seems to be missing is spawning a new client thread to service the client socket. You currently do this in Run, but that's before any of the clients are actually accepted. And you do it for the wrong socket! Instead, you should be spawning the receiveFromSocket threads in acceptClient, and passing it the client socket. So that's a bug.

    In your receiveFromSocket you also need not create another thread to receiveFromSocket again -- just loop back to the beginning.

    The biggest concern with this thread-per-action design is that you are spawning sender threads on every incoming message. This means you could actually have several sender threads attempting to ::send on the same TCP socket. That's not very safe.

    The order of calls made to WSASend is also the order in which the buffers are transmitted to the transport layer. WSASend should not be called on the same stream-oriented socket concurrently from different threads, because some Winsock providers may split a large send request into multiple transmissions, and this may lead to unintended data interleaving from multiple concurrent send requests on the same stream-oriented socket.

    https://learn.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsasend

    Similarly, instead of spawning threads in broadcastToClients, I suggest you just spawn one persistent sender thread per client socket in acceptClient (together with the receiveFromSocket thread within some createClientHandler).

    To communicate with the sender threads you should use thread-safe blocking queues. Each sender thread would look like this:

    while (alive) {
      msg = queue.next_message();
      send(client_socket, msg);
    }
    

    Then on message received you just do:

    for (client : clients) {
      client.queue.put_message(msg);
    }
    

    So to summarize, to handle each client you need a structure like this:

    struct Client {
      int client_socket;
      BlockingQueue queue;
    
      // optionally if you want to keep track of your threads
      // to be able to safely clean up
      std::thread recv_thread, send_thread;
    };
    

    Safe cleanup is a whole other story.


    Finally, a remark on this comment in your code:

            // Wait for all threads to finish
            for(std::thread& t : threads)
            {
                t.detach();
            }
    

    That's almost the opposite to what std::thread::detach does: https://en.cppreference.com/w/cpp/thread/thread/detach It allows you to destroy the thread object without having to wait for the thread to finish execution.