Search code examples
c++tcpclient-serverboost-asio

TCP client hangs waiting for data after server calls shutdown() and close() (Boost.Asio)


Scenario: a server (asio with C++20 coroutines) sending a fixed amount of data (100 * 4KiB) in a loop, after a connection was established to a client.

After this, the server should close the connection.

The client simply reads this data and closing the socket after it has received 0 bytes from a read call.

I have 32 clients establishing a connection one after another, reading the data until they receive an eof.

This is the server implementation:

class TCPServer
{
public:
    explicit TCPServer()
    {
        asio::co_spawn(ioc, accept(), asio::detached);
        ioc.run();
    }

    asio::awaitable<void> accept()
    {
        try
        {
            for (;;)
            {
                auto socket = co_await acceptor.async_accept(asio::use_awaitable);
                asio::co_spawn(ioc, send(std::move(socket)), asio::detached);
            }
        }
        catch (const std::exception& e)
        {
            std::cerr << "Error: " << e.what() << '\n';
            asio::co_spawn(ioc, accept(), asio::detached);
        }
    }

    asio::awaitable<void> send(asio::ip::tcp::socket socket) const
    {
        const auto bufferSize = 4 * 1024;
        auto buf = std::vector<char>(bufferSize, 'A');
       
        for (int32_t i = 0; i < 100; ++i)
        {     
            auto [errorCode, bytesWritten]
                = co_await asio::async_write(socket, asio::const_buffer(buf.data(), buf.size()), asio::as_tuple(asio::use_awaitable));

        }
        boost::system::error_code ec;
        socket.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
        socket.close();
    }

private:
    asio::io_context ioc;
    asio::ip::tcp::endpoint endpoint{asio::ip::tcp::v4(), 12345};
    asio::ip::tcp::acceptor acceptor{ioc, endpoint};
};

This is the client code:

void TCPClient::open()
{
    addrinfo hints{};
    addrinfo* result = nullptr;

    hints.ai_family = socketDomain;
    hints.ai_socktype = socketType;
    hints.ai_flags = 0; /// use default behavior
    hints.ai_protocol
        = 0; /// specifying 0 in this field indicates that socket addresses with any protocol can be returned by getaddrinfo() ;

    const auto errorCode = getaddrinfo(socketHost.c_str(), socketPort.c_str(), &hints, &result);
    if (errorCode != 0)
    {
        throw CannotOpenClient("Failed getaddrinfo with error: {}", gai_strerror(errorCode));
    }

    /// Try each address until we successfully connect
    while (result != nullptr)
    {
        sockfd = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
        if (sockfd == -1)
        {
            result = result->ai_next;
            continue;
        }

        constexpr static timeval Timeout{0, TCP_SOCKET_DEFAULT_TIMEOUT.count()};
        setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &Timeout, sizeof(Timeout));
        connection = connect(sockfd, result->ai_addr, result->ai_addrlen);

        if (connection != -1)
        {
            break; /// success
        }
        close();
    }
    freeaddrinfo(result);

    if (result == nullptr)
    {
        throw CannotOpenClient("Could not connect to: {}:", socketHost, socketPort);
    }
}

bool TCPClient::fillBuffer(Buffer& buffer, size_t& numReceivedBytes)
{
    bool readWasValid = true;

    const size_t rawTBSize = buffer.getBufferSize();
    while (numReceivedBytes < rawTBSize)
    {
        const ssize_t bufferSizeReceived = read(sockfd, buffer.getBuffer() + numReceivedBytes, rawTBSize - numReceivedBytes);
        numReceivedBytes += bufferSizeReceived;
        if (bufferSizeReceived == INVALID_RECEIVED_BUFFER_SIZE)
        {
            /// if read method returned -1 an error occurred during read.
            readWasValid = false;
            break;
        }
        if (bufferSizeReceived == EOF_RECEIVED_BUFFER_SIZE)
        {
            if (numReceivedBytes == 0)
            {
                readWasValid = false;
                break;
            }
        }
        
    }
    ++generatedBuffers;
    /// Loop while we haven't received any bytes yet and we can still read from the socket.
    return numReceivedBytes == 0 and readWasValid;
}

This is the main processing loop on the client side.

Now, on some occasions, the client does not receive a FIN message and is stuck in the read syscall, waiting for more data to arrive.

What might be the reason for this?

I call shutdown and close on the socket to terminate the connection after the data was sent and would expect the OS TCP stack to handle the final messages between server and client.

This only happens when I use two different physical machines, not on the same host.


Solution

  • The async_write call does not throw errors. Also, when I am using asio on the client side the issue will not occur, oddly enough only when using blocking I/O

    The emphasized part piqued my interest. So I tried it. Then I found that you might be running out of file descriptors. With an async demo client:

    Single Source Demo

    Much simplified, containing both server and client:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    #include <syncstream>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    constexpr uint16_t port = 12345;
    
    auto info() { return std::osyncstream(std::cout); }
    auto debug() { return std::osyncstream(std::cerr); }
    
    asio::awaitable<void> send400k(tcp::socket socket) {
        std::vector buf(4 * 1024, 'A');
    
        [[maybe_unused]] auto [ec, n] =
            co_await async_write(socket, std::vector(100, asio::buffer(buf)), asio::as_tuple);
    
        if (!ec)
            socket.shutdown(tcp::socket::shutdown_both, ec);
        info() << "send400k: " << ec.message() << std::endl;
    }
    
    asio::awaitable<void> read_all(tcp::endpoint ep) try {
        tcp::socket socket{co_await asio::this_coro::executor};
    
        co_await socket.async_connect(ep);
    
        std::vector<char> buf;
        auto [ec, n] = co_await async_read(socket, asio::dynamic_buffer(buf), asio::as_tuple);
        info() << "read: n=" << n << ", " << ec.message() << std::endl;
        assert(!ec.failed() || ec == asio::error::eof);
        assert(n == buf.size());
        assert(n == 400 * 1024);
        assert(std::all_of(buf.begin(), buf.end(), [](char c) { return c == 'A'; }));
    } catch (boost::system::system_error const& se) {
        debug() << "read_all: " << se.code().message() << std::endl;
    }
    
    asio::awaitable<void> listener() try {
        auto ex = co_await asio::this_coro::executor;
        tcp::acceptor acceptor{ex, {{}, port}};
        for (;;)
            co_spawn(ex, send400k(co_await acceptor.async_accept()), asio::detached);
    } catch (boost::system::system_error const& se) {
        debug() << "listener: " << se.code().message() << std::endl;
    }
    
    #include <set>
    int main(int argc, char** argv) {
        asio::thread_pool ioc;
        if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
            co_spawn(ioc, listener(), asio::detached);
        else
            for (int i = 0; i < 3200; ++i)
                co_spawn(ioc, read_all({{}, port}), asio::detached);
        ioc.join();
    }
    

    Now, usually on linux ulimit -n is 1024. You will observe that no more than 1018 connections are accepted each time. Increasing the limit removes the behaviour.

    In other words: check that your accepter doesn't keep thrashing through the limit. Consider a graceful fallback instead of blindly keeping on accepting when out of resources.