Search code examples
c++boosttcpboost-asio

Deadlock in local client/server environment with 10K concurrent TCP connections


In this simple implementation, a server listener coroutine is spawned onto an asio::io_context to accept incoming connections, spawning a session on accept. At the same time 10K concurrent clients establish a connections to the server. The server writes 1KB chunks of data to each client in a loop. The main thread waits until all connections are established, sleeping for some time to let the server/clients exchange data. Then, it sets the flag stop to true, at which point the sessions will return, destroying the underlying socket, which terminates the connection on the clients. However, there are always some connections that will not be closed.

using asio::ip::tcp;
using Executor = asio::io_context::executor_type;
std::atomic_bool stop = false;
std::atomic_int32_t connected = 0;
int numConnections = 10'000;

namespace Client {
    asio::awaitable<void, Executor> client() try {
        const auto ex = co_await asio::this_coro::executor;
        tcp::socket socket{ex};
        tcp::resolver res{ex};
        co_await async_connect(socket, res.resolve("localhost", "12345"), asio::deferred);
        ++connected;

        for (std::vector<char> buf(1024);;)
            co_await async_read(socket, asio::mutable_buffer(buf.data(), buf.size()), asio::deferred);

    } catch (asio::system_error const &se) {
        --connected;
    }
}
namespace Server {
    asio::awaitable<void, Executor> session(tcp::socket socket) try {
        for (static const std::vector payload(1024, 'A'); !stop;) {
            co_await async_write(socket, asio::buffer(payload),
                                 asio::as_tuple(asio::deferred));
        }
    } catch (asio::system_error &err) {
        log() << err.code().message() << std::endl;
    }

    asio::awaitable<void, Executor> listener() try {
        const auto executor = co_await asio::this_coro::executor;

        for (tcp::acceptor acceptor{executor, {{}, 12345}};;) {
            co_spawn(executor, session(co_await acceptor.async_accept(asio::deferred)), asio::detached);
        }
    } catch (asio::system_error const &se) {
        log() << se.code().message() << std::endl;
    }
}
using namespace std::chrono_literals;
using std::this_thread::sleep_for;

#include <ranges>
int main() {
    asio::io_context ctx;
    asio::cancellation_signal signal;

    co_spawn(ctx, Server::listener(), asio::bind_cancellation_slot(signal.slot(), asio::detached));
    std::ranges::for_each(std::views::iota(0, numConnections), [&](int) {
        co_spawn(ctx, Client::client(), asio::detached);
    });

    std::jthread th([&ctx] { ctx.run(); });

    while (connected < numConnections) {
        sleep_for(1s);
        log() << "Clients connected: " << connected << std::endl;
    }

    sleep_for(5s);
    stop = true;
    signal.emit(asio::cancellation_type::terminal);

    while (connected > 0) {
        sleep_for(1s);
        log() << "Clients still connected: " << connected << std::endl;
    }
}

The output I get looks like the following:

main:74 Clients connected: 4097
main:74 Clients connected: 8107
main:74 Clients connected: 8107
main:74 Clients connected: 9174
main:74 Clients connected: 9174
client:25 All clients connected
main:74 Clients connected: 10000
listener:53 Operation aborted.
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047

At 3047 clients, it seems that we are stuck. Since the connection is only terminated from the server session, the issue must lie in the fact that the coroutine is suspended in the async_write call, never being resumed from there. This should mean that there is no data to write, which is not clear to me since the clients should read the data, making buffer space available for the server to write to. Therefore, at some point, I would expect the async_write to succeed, allowing the session to exit, terminating the connection. Why do we run into this deadlock scenario here?


Solution

  • You only count the number of clients connecting (on a TCP level).

    "10K connections" are on that level: the kernel TCP stack accepting a connection (and usually putting it in a back-log).

    3047 clients spells the same issue as before: open descriptor limits. Did you modify the limits to see that it correlates?

    If you make your stats symmetrical:

    Live On Coliru

    //#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
    #include <boost/asio.hpp>
    #include <iostream>
    #include <syncstream>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using Context      = asio::thread_pool;
    using Executor     = Context::executor_type;
    using system_error = boost::system::system_error;
    
    constexpr uint16_t port       = 12345;
    constexpr int      numClients = 10'000;
    std::atomic_bool   s_stop     = false;
    
    #include <experimental/scope>
    using std::experimental::scope_exit;
    std::atomic_uint32_t s_clients_connected = 0, s_server_connected = 0;
    static auto client_guard() { return scope_exit([up = ++s_clients_connected] { --s_clients_connected; }); }
    static auto server_guard() { return scope_exit([up = ++s_server_connected]  { --s_server_connected;  }); }
    
    #define log() std::osyncstream(std::cout) << __FUNCTION__ << ':' << __LINE__ << ' '
    
    static asio::awaitable<void, Executor> wait_for(auto d) {
        co_await asio::steady_timer(co_await asio::this_coro::executor, d).async_wait();
    }
    
    namespace Client {
        static asio::awaitable<void, Executor> client() try {
            tcp::socket       socket{co_await asio::this_coro::executor};
            std::vector<char> buf(1024);
            co_await socket.async_connect({{}, port});
    
            for (auto guard = client_guard();;)
                co_await async_read(socket, asio::mutable_buffer(buf.data(), buf.size()));
        } catch (system_error const& se) {
            log() << se.code().message() << std::endl;
        }
    } // namespace Client
    
    namespace Server {
        static asio::awaitable<void, Executor> session(tcp::socket socket) try {
            auto guard = server_guard();
            for (static std::vector const payload(1024, 'A'); !s_stop;)
                co_await async_write(socket, asio::buffer(payload)/*, asio::as_tuple*/);
        } catch (system_error const& err) {
            log() << err.code().message() << std::endl;
        }
    
        static asio::awaitable<void, Executor> listener() try {
            auto ex = co_await asio::this_coro::executor;
            for (tcp::acceptor acceptor{ex, {{}, port}};;)
                co_spawn(ex, session(co_await acceptor.async_accept()), asio::detached);
        } catch (system_error const& se) {
            log() << se.code().message() << std::endl;
        }
    } // namespace Server
    
    using namespace std::chrono_literals;
    using std::this_thread::sleep_for;
    
    #define stats()                                                                                              \
        do {                                                                                                     \
            std ::osyncstream(std ::cout)                                                                        \
                << __FUNCTION__ << ':' << 62 << ' ' << "Clients/Server connected: " << s_clients_connected       \
                << "/" << s_server_connected << std::endl;                                                       \
        } while (0)
    
    #define monitor(cond)                                                                                        \
        do {                                                                                                     \
            stats();                                                                                             \
            sleep_for(100ms);                                                                                    \
        } while (cond);
    
    void debug_step(asio::io_context& ioc) { ioc.run_one(); }
    
    int main() {
        Context ctx(1);
    
        asio::cancellation_signal sig;
        co_spawn(ctx, Server::listener, bind_cancellation_slot(sig.slot(), asio::detached));
        sleep_for(10ms);
    
        for (auto i = numClients; i--;)
            co_spawn(ctx, Client::client(), asio::detached);
    
        monitor(s_clients_connected < numClients);
    
        s_stop = true;
    
        monitor(s_clients_connected || s_server_connected);
    
        sig.emit(asio::cancellation_type::all);
        ctx.join();
    
        stats();
    }
    

    You will find that you are pulling the plug way before all the connections have been actively accepted in Asio userland (output through uniq -c):

          2 main:62 Clients/Server connected: 2/1
          1 main:62 Clients/Server connected: 4100/178
          1 main:62 Clients/Server connected: 4100/322
          1 main:62 Clients/Server connected: 4100/430
          1 main:62 Clients/Server connected: 4100/519
          1 main:62 Clients/Server connected: 4100/599
            ...
          1 main:62 Clients/Server connected: 9651/2958
          1 main:62 Clients/Server connected: 10000/2967
       3784 client:36 End of file
          1 main:62 Clients/Server connected: 6216/0
       3254 client:36 End of file
          6 main:62 Clients/Server connected: 2962/0
        590 client:36 End of file
         21 main:62 Clients/Server connected: 2372/0
        590 client:36 End of file
         21 main:62 Clients/Server connected: 1782/0
        868 client:36 End of file
        307 main:62 Clients/Server connected: 914/0
    

    A more accurate approach would be to wait for sessions to be fully active:

    monitor(std::min(s_clients_connected, s_server_connected) < numClients);
    
    s_stop = true;
    sig.emit(asio::cancellation_type::all);
    
    monitor(s_clients_connected || s_server_connected);
    

    Taming The Herd

    To avoid the server from being starved by the Thundering Herd of clients consider "holding" the herd before they start transferring bulk data:

    Live On Coliru

    //#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
    #include <boost/asio.hpp>
    #include <iostream>
    #include <syncstream>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using Context      = asio::thread_pool;
    using Executor     = Context::executor_type;
    using system_error = boost::system::system_error;
    using namespace std::chrono_literals;
    
    constexpr uint16_t port       = 12345;
    constexpr int      numClients = 10'000;
    std::atomic_bool   s_stop     = false;
    
    #include <experimental/scope>
    using std::experimental::scope_exit;
    std::atomic_uint32_t s_clients_connected = 0, s_server_connected = 0;
    static auto client_guard() { return scope_exit([up = ++s_clients_connected] { --s_clients_connected; }); }
    static auto server_guard() { return scope_exit([up = ++s_server_connected]  { --s_server_connected;  }); }
    
    static auto const s_start = std::chrono::steady_clock::now();
    #define log()                                                                                                \
        std::osyncstream(std::cout) << std::setw(6) << (std::chrono::steady_clock::now() - s_start) / 1ms        \
                                    << "ms " << __FUNCTION__ << ':' << __LINE__ << ' '
    
    static asio::awaitable<void, Executor> wait_for(auto d) {
        co_await asio::steady_timer(co_await asio::this_coro::executor, d).async_wait();
    }
    
    namespace Client {
        static asio::awaitable<void, Executor> client(auto& hold) try {
            tcp::socket       socket{co_await asio::this_coro::executor};
            std::vector<char> buf(1024);
            co_await socket.async_connect({{}, port});
    
            auto guard = client_guard();
            co_await hold.async_wait(asio::as_tuple);
    
            for (;;)
                co_await async_read(socket, asio::mutable_buffer(buf.data(), buf.size()));
        } catch (system_error const& se) {
            // log() << se.code().message() << std::endl;
        }
    } // namespace Client
    
    namespace Server {
        static asio::awaitable<void, Executor> session(tcp::socket socket, auto& hold) try {
            auto guard = server_guard();
            co_await hold.async_wait(asio::as_tuple);
            for (static std::vector const payload(1024, 'A'); !s_stop;)
                co_await async_write(socket, asio::buffer(payload)/*, asio::as_tuple*/);
        } catch (system_error const& err) {
            log() << err.code().message() << std::endl;
        }
    
        static asio::awaitable<void, Executor> listener(auto& hold) try {
            auto ex = co_await asio::this_coro::executor;
            for (tcp::acceptor acceptor{ex, {{}, port}};;)
                co_spawn(ex, session(co_await acceptor.async_accept(), hold), asio::detached);
        } catch (system_error const& se) {
            log() << se.code().message() << std::endl;
        }
    } // namespace Server
    
    #define stats()                                                                                              \
        do {                                                                                                     \
            std ::osyncstream(std ::cout)                                                                        \
                << std::setw(6) << (std::chrono::steady_clock::now() - s_start) / 1ms << "ms " << __FUNCTION__   \
                << ':' << 62 << ' ' << "Clients/Server connected: " << s_clients_connected << "/"                \
                << s_server_connected << std::endl;                                                              \
        } while (0)
    
    #define monitor(cond)                                                                                        \
        do {                                                                                                     \
            stats();                                                                                             \
            co_await wait_for(1s);                                                                                    \
        } while (cond);
    
    asio::awaitable<void, Executor> co_main() {
        auto ex = co_await asio::this_coro::executor;
    
        asio::cancellation_signal sig;
        asio::steady_timer        hold(ex, asio::steady_timer::time_point::max());
        co_spawn(ex, Server::listener(hold), bind_cancellation_slot(sig.slot(), asio::detached));
        co_await wait_for(10ms);
    
        for (auto i = numClients; i--;)
            co_spawn(ex, Client::client(hold), asio::detached);
    
        monitor(std::min(s_clients_connected, s_server_connected) < numClients);
    
        hold.cancel(); // release the herds
        co_await wait_for(5s);
        s_stop = true;
        sig.emit(asio::cancellation_type::all);
    
        monitor(s_clients_connected || s_server_connected);
    
        stats();
    }
    
    int main() {
        Context ctx/*(1)*/;
        co_spawn(ctx, co_main(), asio::detached);
        ctx.join();
    }
    

    With sample output on my system:

    sehe@workstation:~/Projects/stackoverflow$ time (ulimit -n 32000; ./build/sotest)
       218ms co_main:62 Clients/Server connected: 10000/10000
      6899ms co_main:62 Clients/Server connected: 10000/9995
      6914ms listener:62 Operation canceled
      7899ms co_main:62 Clients/Server connected: 0/0
    
    real    0m7.907s
    user    0m12.543s
    sys 1m8.366s