In this simple implementation, a server listener
coroutine is spawned onto an asio::io_context
to accept incoming connections, spawning a session
on accept.
At the same time 10K concurrent clients establish a connections to the server.
The server writes 1KB chunks of data to each client in a loop.
The main thread waits until all connections are established, sleeping for some time to let the server/clients exchange data.
Then, it sets the flag stop
to true, at which point the sessions
will return, destroying the underlying socket, which terminates the connection on the clients.
However, there are always some connections that will not be closed.
using asio::ip::tcp;
using Executor = asio::io_context::executor_type;
std::atomic_bool stop = false;
std::atomic_int32_t connected = 0;
int numConnections = 10'000;
namespace Client {
asio::awaitable<void, Executor> client() try {
const auto ex = co_await asio::this_coro::executor;
tcp::socket socket{ex};
tcp::resolver res{ex};
co_await async_connect(socket, res.resolve("localhost", "12345"), asio::deferred);
++connected;
for (std::vector<char> buf(1024);;)
co_await async_read(socket, asio::mutable_buffer(buf.data(), buf.size()), asio::deferred);
} catch (asio::system_error const &se) {
--connected;
}
}
namespace Server {
asio::awaitable<void, Executor> session(tcp::socket socket) try {
for (static const std::vector payload(1024, 'A'); !stop;) {
co_await async_write(socket, asio::buffer(payload),
asio::as_tuple(asio::deferred));
}
} catch (asio::system_error &err) {
log() << err.code().message() << std::endl;
}
asio::awaitable<void, Executor> listener() try {
const auto executor = co_await asio::this_coro::executor;
for (tcp::acceptor acceptor{executor, {{}, 12345}};;) {
co_spawn(executor, session(co_await acceptor.async_accept(asio::deferred)), asio::detached);
}
} catch (asio::system_error const &se) {
log() << se.code().message() << std::endl;
}
}
using namespace std::chrono_literals;
using std::this_thread::sleep_for;
#include <ranges>
int main() {
asio::io_context ctx;
asio::cancellation_signal signal;
co_spawn(ctx, Server::listener(), asio::bind_cancellation_slot(signal.slot(), asio::detached));
std::ranges::for_each(std::views::iota(0, numConnections), [&](int) {
co_spawn(ctx, Client::client(), asio::detached);
});
std::jthread th([&ctx] { ctx.run(); });
while (connected < numConnections) {
sleep_for(1s);
log() << "Clients connected: " << connected << std::endl;
}
sleep_for(5s);
stop = true;
signal.emit(asio::cancellation_type::terminal);
while (connected > 0) {
sleep_for(1s);
log() << "Clients still connected: " << connected << std::endl;
}
}
The output I get looks like the following:
main:74 Clients connected: 4097
main:74 Clients connected: 8107
main:74 Clients connected: 8107
main:74 Clients connected: 9174
main:74 Clients connected: 9174
client:25 All clients connected
main:74 Clients connected: 10000
listener:53 Operation aborted.
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
At 3047 clients, it seems that we are stuck.
Since the connection is only terminated from the server session, the issue must lie in the fact that the coroutine is suspended in the async_write
call, never being resumed from there.
This should mean that there is no data to write, which is not clear to me since the clients should read the data, making buffer space available for the server to write to. Therefore, at some point, I would expect the async_write
to succeed, allowing the session to exit, terminating the connection.
Why do we run into this deadlock scenario here?
You only count the number of clients connecting (on a TCP level).
"10K connections" are on that level: the kernel TCP stack accepting a connection (and usually putting it in a back-log).
3047 clients spells the same issue as before: open descriptor limits. Did you modify the limits to see that it correlates?
If you make your stats symmetrical:
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
#include <boost/asio.hpp>
#include <iostream>
#include <syncstream>
namespace asio = boost::asio;
using asio::ip::tcp;
using Context = asio::thread_pool;
using Executor = Context::executor_type;
using system_error = boost::system::system_error;
constexpr uint16_t port = 12345;
constexpr int numClients = 10'000;
std::atomic_bool s_stop = false;
#include <experimental/scope>
using std::experimental::scope_exit;
std::atomic_uint32_t s_clients_connected = 0, s_server_connected = 0;
static auto client_guard() { return scope_exit([up = ++s_clients_connected] { --s_clients_connected; }); }
static auto server_guard() { return scope_exit([up = ++s_server_connected] { --s_server_connected; }); }
#define log() std::osyncstream(std::cout) << __FUNCTION__ << ':' << __LINE__ << ' '
static asio::awaitable<void, Executor> wait_for(auto d) {
co_await asio::steady_timer(co_await asio::this_coro::executor, d).async_wait();
}
namespace Client {
static asio::awaitable<void, Executor> client() try {
tcp::socket socket{co_await asio::this_coro::executor};
std::vector<char> buf(1024);
co_await socket.async_connect({{}, port});
for (auto guard = client_guard();;)
co_await async_read(socket, asio::mutable_buffer(buf.data(), buf.size()));
} catch (system_error const& se) {
log() << se.code().message() << std::endl;
}
} // namespace Client
namespace Server {
static asio::awaitable<void, Executor> session(tcp::socket socket) try {
auto guard = server_guard();
for (static std::vector const payload(1024, 'A'); !s_stop;)
co_await async_write(socket, asio::buffer(payload)/*, asio::as_tuple*/);
} catch (system_error const& err) {
log() << err.code().message() << std::endl;
}
static asio::awaitable<void, Executor> listener() try {
auto ex = co_await asio::this_coro::executor;
for (tcp::acceptor acceptor{ex, {{}, port}};;)
co_spawn(ex, session(co_await acceptor.async_accept()), asio::detached);
} catch (system_error const& se) {
log() << se.code().message() << std::endl;
}
} // namespace Server
using namespace std::chrono_literals;
using std::this_thread::sleep_for;
#define stats() \
do { \
std ::osyncstream(std ::cout) \
<< __FUNCTION__ << ':' << 62 << ' ' << "Clients/Server connected: " << s_clients_connected \
<< "/" << s_server_connected << std::endl; \
} while (0)
#define monitor(cond) \
do { \
stats(); \
sleep_for(100ms); \
} while (cond);
void debug_step(asio::io_context& ioc) { ioc.run_one(); }
int main() {
Context ctx(1);
asio::cancellation_signal sig;
co_spawn(ctx, Server::listener, bind_cancellation_slot(sig.slot(), asio::detached));
sleep_for(10ms);
for (auto i = numClients; i--;)
co_spawn(ctx, Client::client(), asio::detached);
monitor(s_clients_connected < numClients);
s_stop = true;
monitor(s_clients_connected || s_server_connected);
sig.emit(asio::cancellation_type::all);
ctx.join();
stats();
}
You will find that you are pulling the plug way before all the connections have been actively accepted in Asio userland (output through uniq -c
):
2 main:62 Clients/Server connected: 2/1
1 main:62 Clients/Server connected: 4100/178
1 main:62 Clients/Server connected: 4100/322
1 main:62 Clients/Server connected: 4100/430
1 main:62 Clients/Server connected: 4100/519
1 main:62 Clients/Server connected: 4100/599
...
1 main:62 Clients/Server connected: 9651/2958
1 main:62 Clients/Server connected: 10000/2967
3784 client:36 End of file
1 main:62 Clients/Server connected: 6216/0
3254 client:36 End of file
6 main:62 Clients/Server connected: 2962/0
590 client:36 End of file
21 main:62 Clients/Server connected: 2372/0
590 client:36 End of file
21 main:62 Clients/Server connected: 1782/0
868 client:36 End of file
307 main:62 Clients/Server connected: 914/0
A more accurate approach would be to wait for sessions to be fully active:
monitor(std::min(s_clients_connected, s_server_connected) < numClients);
s_stop = true;
sig.emit(asio::cancellation_type::all);
monitor(s_clients_connected || s_server_connected);
To avoid the server from being starved by the Thundering Herd of clients consider "holding" the herd before they start transferring bulk data:
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
#include <boost/asio.hpp>
#include <iostream>
#include <syncstream>
namespace asio = boost::asio;
using asio::ip::tcp;
using Context = asio::thread_pool;
using Executor = Context::executor_type;
using system_error = boost::system::system_error;
using namespace std::chrono_literals;
constexpr uint16_t port = 12345;
constexpr int numClients = 10'000;
std::atomic_bool s_stop = false;
#include <experimental/scope>
using std::experimental::scope_exit;
std::atomic_uint32_t s_clients_connected = 0, s_server_connected = 0;
static auto client_guard() { return scope_exit([up = ++s_clients_connected] { --s_clients_connected; }); }
static auto server_guard() { return scope_exit([up = ++s_server_connected] { --s_server_connected; }); }
static auto const s_start = std::chrono::steady_clock::now();
#define log() \
std::osyncstream(std::cout) << std::setw(6) << (std::chrono::steady_clock::now() - s_start) / 1ms \
<< "ms " << __FUNCTION__ << ':' << __LINE__ << ' '
static asio::awaitable<void, Executor> wait_for(auto d) {
co_await asio::steady_timer(co_await asio::this_coro::executor, d).async_wait();
}
namespace Client {
static asio::awaitable<void, Executor> client(auto& hold) try {
tcp::socket socket{co_await asio::this_coro::executor};
std::vector<char> buf(1024);
co_await socket.async_connect({{}, port});
auto guard = client_guard();
co_await hold.async_wait(asio::as_tuple);
for (;;)
co_await async_read(socket, asio::mutable_buffer(buf.data(), buf.size()));
} catch (system_error const& se) {
// log() << se.code().message() << std::endl;
}
} // namespace Client
namespace Server {
static asio::awaitable<void, Executor> session(tcp::socket socket, auto& hold) try {
auto guard = server_guard();
co_await hold.async_wait(asio::as_tuple);
for (static std::vector const payload(1024, 'A'); !s_stop;)
co_await async_write(socket, asio::buffer(payload)/*, asio::as_tuple*/);
} catch (system_error const& err) {
log() << err.code().message() << std::endl;
}
static asio::awaitable<void, Executor> listener(auto& hold) try {
auto ex = co_await asio::this_coro::executor;
for (tcp::acceptor acceptor{ex, {{}, port}};;)
co_spawn(ex, session(co_await acceptor.async_accept(), hold), asio::detached);
} catch (system_error const& se) {
log() << se.code().message() << std::endl;
}
} // namespace Server
#define stats() \
do { \
std ::osyncstream(std ::cout) \
<< std::setw(6) << (std::chrono::steady_clock::now() - s_start) / 1ms << "ms " << __FUNCTION__ \
<< ':' << 62 << ' ' << "Clients/Server connected: " << s_clients_connected << "/" \
<< s_server_connected << std::endl; \
} while (0)
#define monitor(cond) \
do { \
stats(); \
co_await wait_for(1s); \
} while (cond);
asio::awaitable<void, Executor> co_main() {
auto ex = co_await asio::this_coro::executor;
asio::cancellation_signal sig;
asio::steady_timer hold(ex, asio::steady_timer::time_point::max());
co_spawn(ex, Server::listener(hold), bind_cancellation_slot(sig.slot(), asio::detached));
co_await wait_for(10ms);
for (auto i = numClients; i--;)
co_spawn(ex, Client::client(hold), asio::detached);
monitor(std::min(s_clients_connected, s_server_connected) < numClients);
hold.cancel(); // release the herds
co_await wait_for(5s);
s_stop = true;
sig.emit(asio::cancellation_type::all);
monitor(s_clients_connected || s_server_connected);
stats();
}
int main() {
Context ctx/*(1)*/;
co_spawn(ctx, co_main(), asio::detached);
ctx.join();
}
With sample output on my system:
sehe@workstation:~/Projects/stackoverflow$ time (ulimit -n 32000; ./build/sotest)
218ms co_main:62 Clients/Server connected: 10000/10000
6899ms co_main:62 Clients/Server connected: 10000/9995
6914ms listener:62 Operation canceled
7899ms co_main:62 Clients/Server connected: 0/0
real 0m7.907s
user 0m12.543s
sys 1m8.366s