Search code examples
c++socketswinapirecv

Asynchronous I/O on C++ Sockets


I'm writing a multithreaded sockets applications (for windows), however, I have a problem when connecting multiple clients. I can send messages from the server to the clients, however, I can only send messages from one client to the server. The other clients can't send messages to the server. I've googled some stuff and found that Overlapping/Asynchronous I/O is the way to go. There's just one problem with that, I don't know how to implement that, so I'm basically asking how I would go about doing this or if this approach is wrong.

The RecieveFromClients() function is what I want to make Asynchronous.

Thanks in advance.

Here is my code:

main.cpp

#include "server.h"

int main()
{
    Server server;

    server.StartServer();

    return 0;
}

Server.h

#include <WinSock2.h>
#include <WS2tcpip.h>
#include <iphlpapi.h>
#include <stdio.h>

#include <string>
#include <iostream>
#include <vector>
#include <thread>

#pragma comment(lib, "Ws2_32.lib")

#define DEFAULT_PORT 4566
#define BACKLOG 10

class Server
{
public:
    Server();
    ~Server();
    int StartServer();
private:
    int Socket();
    int Bind();
    int Listen();
    void AcceptConnections();
    int StopServer();
    int CloseClientSocket(int client_num);

    void GetClients(int server_socket, std::vector<int>* clients,
        std::vector<sockaddr_in> *client_addrs);

    void SendToClients();
    void RecieveFromClients(int id);
private:
    sockaddr_in server_addr;
    int connected_clients, counted_clients;
    int server_socket;
    int result;
    int msgSize;
    std::vector<int> clients;
    std::vector<sockaddr_in> client_addrs;
private:
    std::thread get_clients;
    std::thread send_messages;
    std::thread recieve_messages;
};

Server.cpp

#include "server.h"

Server::Server()
    :
    connected_clients(0),
    counted_clients(0),
    server_socket(0),
    result(0)
{
    ZeroMemory(&server_addr, 0);

    WSAData wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
    {
        printf("wsastartip falied\n");
    }
}

Server::~Server()
{
    WSACleanup();
}

int Server::StartServer()
{
    if (Socket() != 0)
    {
        return 1;
    }

    if (Bind() != 0)
    {
        return 1;
    }

    if (Listen() != 0)
    {
        return 1;
    }

    AcceptConnections();

    return 0;
}

int Server::Socket()
{
    server_socket = socket(AF_INET, SOCK_STREAM, 0);
    if (server_socket == INVALID_SOCKET)
    {
        std::cout << "Failed to create socket" << std::endl;
        return 1;
    }

    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(DEFAULT_PORT);
    server_addr.sin_addr.S_un.S_addr = INADDR_ANY;

    return 0;
}

int Server::Bind()
{
    result = bind(server_socket, (sockaddr*)&server_addr,
        sizeof(server_addr));
    if (result == SOCKET_ERROR)
    {
        std::cout << "Failed to bind socket" << std::endl;
        return 1;
    }

    return 0;
}

int Server::Listen()
{
    result = listen(server_socket, BACKLOG);
    if (result == SOCKET_ERROR)
    {
        std::cout << "Listening failed" << std::endl;
        return 1;
    }

    return 0;
}

void Server::AcceptConnections()
{
    get_clients = std::thread(&Server:: GetClients, this, server_socket,
        &clients, &client_addrs);
    send_messages = std::thread(&Server::SendToClients, this);
    recieve_messages = std::thread(&Server::RecieveFromClients, this, counted_clients);
    get_clients.join();
    send_messages.join();
    recieve_messages.join();
}

int Server::StopServer()
{
    std::terminate();

    for (int client : clients)
    {
        result = closesocket(client);
        if (result == SOCKET_ERROR)
        {
            std::cout << "Failed to close client socket" << std::endl;
            return 1;
        }
    }

    return 0;
}

int Server::CloseClientSocket(int client_num)
{
    result = closesocket(clients[client_num]);
    if (result == SOCKET_ERROR)
    {
        std::cout << "Failed to close client socket" << std::endl;
        return 1;
    }
    return 0;
}

void Server::GetClients(int server_socket, std::vector<int>* clients,
    std::vector<sockaddr_in>* client_addrs)
{
    
    while(true)
    {
        sockaddr_in client_addr = { 0 };
        socklen_t client_addrstrlen = sizeof(client_addr);
        int client;

        client = accept(server_socket, (sockaddr*)&client_addr,
            &client_addrstrlen);

        clients->push_back(client);
        client_addrs->push_back(client_addr);
        ++connected_clients;

        char ip[INET_ADDRSTRLEN] = "";
        char port[100] = "";
        inet_ntop(AF_INET, &client_addr.sin_addr, ip, sizeof(ip));
        std::cout << "Client connected from " << ip << ":" << 
            client_addr.sin_port << std::endl;
    }
}

void Server::SendToClients()
{
    std::string msg;
    do
    {
        msg.clear();
        getline(std::cin, msg);
        if (msg.size() > 255)
        {
            std::cout << "Message must be less than 256 bytes"
                << std::endl;
            continue;
        }

        for (int client : clients)
        {
            int size;
            size = send(client, msg.data(), msg.size(), 0);
            if (size == SOCKET_ERROR)
            {
                std::cout << "Failed to send message to client"
                    << std::endl;
            }
        }
    } while (msg != "exit");

    if (StopServer() != 0)
    {
        std::cout << "Failed to close client sockets" << std::endl;
    }
}

void Server::RecieveFromClients(int id)
{
    std::vector<char> msgBuffer(256);
    do
    {
        
        if (connected_clients > 0)
        {
            msgBuffer.clear();
            msgBuffer.resize(256);
            char ip[INET_ADDRSTRLEN];
            inet_ntop(AF_INET, &client_addrs[id].sin_addr, ip, sizeof(ip));

            if (msgSize = recv(clients[id], msgBuffer.data(),
                msgBuffer.size(), 0) > 0)
            {
                std::cout << ip << ": ";
                for (char c : msgBuffer)
                {
                    if (c != 0)
                    {
                        std::cout << c;
                    }
                }
                std::cout << std::endl;
            }
            else
            {
                if (msgSize == SOCKET_ERROR)
                {
                    std::cout << "Failed to recieve data" << std::endl;
                    break;
                }
                else if (clients[id] > 0)
                {
                    std::cout << "Client " << ip << " has disconnected" << std::endl;
                    CloseClientSocket(0);
                    break;
                }
            }
        }
        else
        {
            continue;
        }
    } while (true);
}


Solution

  • Using Overlapped I/O will be a fairly large change to the design of your code. But fortunately, there is a simpler solution. In your RecieveFromClients() method, you can use select() to determine which client sockets actually have pending data to read before you attempt to read from them. You are using blocking sockets, so a call to recv() will block the calling thread until data is received, so you don’t want to perform a blocking read until you are actually ready to read something.

    Also, since you are not creating a new thread for each accepted client, the id parameter of RecieveFromClients(), and the client_num parameter of CloseClientSocket(), are being used incorrectly and should just be removed completely. The receive function should run a loop over the list of connected clients. And the close function should take the specific socket handle to close.

    That being said, another major design problem with your code is that you are sharing variables and containers across multiple threads, but you are not synchronizing access to any of them. That will cause you big problems in the long run.