I had posted this earlier but hadn't posted a proper minimum working example, apologies for that.
The problem is quite clear. Starting 440 websocket connections with Boost ASIO eats up about 220,000 file descriptors.
Here's how I come to that conclusion:
I sort each process by the number of file descriptors they eat use:
lsof -n | awk '{print $2}' | sort | uniq -c | sort -nr
My PID, 631457, comes out on top:
223504 631457
with 223k file descriptors in use. This is the code:
main.cpp:
int main(int argc, char **argv) {
boost::asio::io_context ioctx;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> workGuard =
boost::asio::make_work_guard(ioctx.get_executor());
// Initialise our IO working threads, 1 for each CPU core.
std::vector<std::shared_ptr<std::thread>> ioThreads;
for (size_t i = 0; i < 16; ++i) {
ioThreads.push_back(std::make_shared<std::thread>([&ioctx]() { ioctx.run(); }));
}
std::vector<std::shared_ptr<WebSocketClient>> clients;
std::vector<std::shared_ptr<std::binary_semaphore>> semaphores;
std::vector<std::shared_ptr<std::thread>> clientThreads;
for(size_t i = 0; i < 440; ++i){
clients.push_back(std::make_shared<WebSocketClient>(ioctx, "ws.postman-echo.com", 443, "/raw", [](){}));
semaphores.push_back(std::make_shared<std::binary_semaphore>(0));
clients.back()->setMessageReceivedCallback([sem = semaphores.back()](){
sem->release();
});
clientThreads.push_back(std::make_shared<std::thread>([index = i, sem = semaphores.back(), client = clients.back()](){
size_t counter = 0;
while(true){
client->sendMessage(std::to_string(counter++));
sem->acquire();
auto message = client->readMessageIfAvailable();
if(message) {
std::cout << index << " : " << message.value() << std::endl;
}
}
}));
}
for (auto &thread : clientThreads) {
thread->join();
}
for (auto &thread : ioThreads) {
thread->join();
}
}
WebsocketClient.h:
class WebSocketClient {
public:
/**
*
* @param host
* @param port
* @param path
* @param connectionClosedCallback Signals the connection was closed. This function may NOT cause the destructor of
* WebSocketClient to be called on this thread, since this will cause a deadlock. Always signal another thread.
*/
WebSocketClient(boost::asio::io_context &ioctx, std::string host, uint16_t port, std::string path,
std::function<void()> connectionClosedCallback = std::function<void()>(),
std::string customKeepAliveMessage = "", unsigned int customKeepAliveMessageIntervalInMs = 0);
~WebSocketClient();
WebSocketClient(WebSocketClient &&other) = delete;
WebSocketClient(const WebSocketClient &other) = delete;
WebSocketClient &operator=(WebSocketClient &&other) = delete;
WebSocketClient &operator=(const WebSocketClient &other) = delete;
bool sendMessage(std::string message);
std::optional<std::string> readMessageIfAvailable();
void setMessageReceivedCallback(std::function<void()> callback);
void startClose();
bool closed();
private:
void startCustomKeepAliveMessageTimer();
void sendCustomKeepAliveMessage();
void handleFailure(boost::beast::error_code ec, char const *what);
void handleClose();
void on_resolve(boost::beast::error_code ec, const boost::asio::ip::tcp::resolver::results_type &results);
void on_connect(boost::beast::error_code ec, const boost::asio::ip::tcp::resolver::results_type::endpoint_type &ep);
void on_ssl_handshake(boost::beast::error_code ec);
void on_handshake(boost::beast::error_code ec);
void startWrite();
void writeCompleted(boost::beast::error_code ec, std::size_t bytes_transferred);
void startRead();
void readCompleted(boost::beast::error_code ec, std::size_t bytes_transferred);
void on_close(boost::beast::error_code ec);
boost::asio::io_context &ioContext;
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::asio::ip::tcp::resolver resolver_;
boost::asio::ssl::context sslContext;
boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>> ws_;
std::string writeBuffer;
boost::beast::flat_buffer readBuffer;
std::string host;
std::string port;
std::string path;
LockFreeQueue<std::string> writeQueue;
LockFreeQueue<std::string> readQueue;
bool writing;
std::atomic<bool> closing;
/// Must be captured by the lambda that will be executed on async job returning, pending jobs = .use_count() - 1
std::shared_ptr<nullptr_t> asyncJobTracker;
std::function<void()> connectionClosedCallback;
boost::asio::steady_timer customKeepAliveTimer;
std::string customKeepAliveMessage;
unsigned int customKeepAliveMessageIntervalInMs;
};
WebsocketClient.cpp:
#include "WebSocketClient.h"
#include <iostream>
#include <utility>
//#include "CertificateManager.h"
WebSocketClient::WebSocketClient(boost::asio::io_context &ioctx,
std::string host, uint16_t port, std::string path,
std::function<void()> connectionClosedCallback,
std::string customKeepAliveMessage,
unsigned int customKeepAliveMessageIntervalInMs)
: ioContext(ioctx),
strand(boost::asio::make_strand(ioContext.get_executor())),
customKeepAliveTimer(strand),
resolver_(strand),
sslContext(boost::asio::ssl::context::tlsv12_client),
ws_(strand , sslContext),
writeBuffer(),
readBuffer(),
host(std::move(host)),
port(std::to_string(port)),
path(std::move(path)),
writeQueue(128),
readQueue(128),
writing(true),
closing(false),
asyncJobTracker(std::make_shared<nullptr_t>()),
connectionClosedCallback(std::move(connectionClosedCallback)),
customKeepAliveMessage(customKeepAliveMessage),
customKeepAliveMessageIntervalInMs(customKeepAliveMessageIntervalInMs)
{
// boost::system::error_code ec;
// ec = CertificateManager::getCertificateManager().loadRootCertificates(sslContext);
// if (ec) {
// handleFailure(ec, "loadRootCertificates");
// return;
// }
resolver_.async_resolve(
this->host, this->port,
[this, asyncJobTracker = asyncJobTracker](const boost::beast::error_code ec,
const boost::asio::ip::tcp::resolver::results_type& results) {
on_resolve(ec, results);
});
}
WebSocketClient::~WebSocketClient() {
startClose();
if (asyncJobTracker.use_count() > 1) {
// LogError <<"WEBSOCKET CONTAINS MAJOR ERROR, async functions not finished after destructor exit" << std::endl;
}
}
void WebSocketClient::handleFailure(boost::beast::error_code ec, const char* what) {
if (!closing) {
closing = true;
// LogInfo << std::string() + what + ": " + ec.message() + "\n" << std::endl;
}
handleClose();
}
void WebSocketClient::handleClose() {
if (asyncJobTracker.use_count() > 2) { // 1 for the member var, 1 for the function calling this.
return;
}
// LogInfo << "Websocket connection closed" << std::endl;
if (connectionClosedCallback) {
connectionClosedCallback();
}
}
void WebSocketClient::on_resolve(boost::beast::error_code ec,
const boost::asio::ip::tcp::resolver::results_type& results) {
if (ec) return handleFailure(ec, "resolve");
// Set a timeout on the operation
boost::beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Make the connection on the IP address we get from a lookup
boost::beast::get_lowest_layer(ws_).async_connect(
results, [this, asyncJobTracker = asyncJobTracker](
boost::beast::error_code ec,
const boost::asio::ip::tcp::resolver::results_type::endpoint_type& ep) { on_connect(ec, ep); });
}
void WebSocketClient::on_connect(boost::beast::error_code ec,
const boost::asio::ip::tcp::resolver::results_type::endpoint_type& ep) {
if (ec) return handleFailure(ec, "connect");
// Set a timeout on the operation
boost::beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Set SNI Hostname (many hosts need this to handshake successfully)
if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host.c_str())) {
ec = boost::beast::error_code(static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category());
return handleFailure(ec, "connect");
}
// Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake.
// See https://tools.ietf.org/html/rfc7230#section-5.4
host += ':' + std::to_string(ep.port());
// Perform the SSL handshake
ws_.next_layer().async_handshake(
boost::asio::ssl::stream_base::client,
[this, asyncJobTracker = asyncJobTracker](boost::beast::error_code ec) { on_ssl_handshake(ec); });
}
void WebSocketClient::on_ssl_handshake(boost::beast::error_code ec) {
if (ec) return handleFailure(ec, "ssl_handshake");
// Turn off the timeout on the tcp_stream, because
// the boost::beast::websocket stream has its own timeout system.
boost::beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the boost::beast::websocket
ws_.set_option(boost::beast::websocket::stream_base::timeout::suggested(boost::beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(boost::beast::websocket::stream_base::decorator(
[](boost::beast::websocket::request_type& req) { req.set(boost::beast::http::field::user_agent, "Agnes"); }));
// Perform the boost::beast::websocket handshake
ws_.async_handshake(host, path,
[this, asyncJobTracker = asyncJobTracker](boost::beast::error_code ec) { on_handshake(ec); });
}
void WebSocketClient::on_handshake(boost::beast::error_code ec) {
if (ec) return handleFailure(ec, "handshake");
writing = false;
startWrite();
startRead();
if(customKeepAliveMessage != "")
{
startCustomKeepAliveMessageTimer();
}
}
void WebSocketClient::startWrite() {
if (writing || closing) {
return;
}
if (!writeQueue.dequeue(&writeBuffer)) {
return;
}
writing = true;
ws_.async_write(boost::asio::buffer(writeBuffer), [this, asyncJobTracker = asyncJobTracker](
boost::beast::error_code ec, std::size_t bytes_transferred) {
writeCompleted(ec, bytes_transferred);
});
}
void WebSocketClient::writeCompleted(boost::beast::error_code ec, std::size_t /*bytes_transferred*/) {
if (ec) return handleFailure(ec, "write");
writing = false;
startWrite();
}
void WebSocketClient::startRead() {
ws_.async_read(readBuffer, [this, asyncJobTracker = asyncJobTracker](boost::beast::error_code ec,
std::size_t bytes_transferred) {
readCompleted(ec, bytes_transferred);
});
}
void WebSocketClient::readCompleted(boost::beast::error_code ec, std::size_t bytes_transferred) {
if (ec) return handleFailure(ec, "read");
readQueue.enqueue({boost::asio::buffer_cast<const char*>(readBuffer.data()), readBuffer.size()});
readBuffer.consume(bytes_transferred);
startRead();
}
void WebSocketClient::startClose() {
if (closing) {
return;
}
closing = true;
boost::asio::post(strand, [this, asyncJobTracker = asyncJobTracker]() {
ws_.async_close(boost::beast::websocket::close_code::normal,
[this, asyncJobTracker = asyncJobTracker](boost::beast::error_code ec) { on_close(ec); });
});
}
void WebSocketClient::on_close(boost::beast::error_code ec) {
if (ec) return handleFailure(ec, "close");
handleClose();
}
bool WebSocketClient::sendMessage(std::string message) {
if (writeQueue.enqueue(std::move(message))) {
boost::asio::post(strand, [this, asyncJobTracker = asyncJobTracker]() { startWrite(); });
return true;
}
return false;
}
std::optional<std::string> WebSocketClient::readMessageIfAvailable() {
std::string message;
return (readQueue.dequeue(&message) ? message : std::optional<std::string>());
}
void WebSocketClient::setMessageReceivedCallback(std::function<void()> callback) {
readQueue.setNewMessageWrittenCallback(std::move(callback));
}
bool WebSocketClient::closed() {
return closing;
}
void WebSocketClient::startCustomKeepAliveMessageTimer() {
boost::asio::post(strand, [this]() {
customKeepAliveTimer.expires_after(std::chrono::milliseconds(customKeepAliveMessageIntervalInMs)); // Set ping interval
customKeepAliveTimer.async_wait([this](const boost::system::error_code& ec) {
if (!ec) {
sendCustomKeepAliveMessage();
}
});
});
}
void WebSocketClient::sendCustomKeepAliveMessage() {
if (!closing) {
sendMessage(customKeepAliveMessage);
startCustomKeepAliveMessageTimer(); // Restart the timer for the next ping
}
}
To reduce the amount of code posted here I have not included the implementation of the LockFreeQueue but I have tested this class thoroughly. The problem does not lie there.
Furthermore, the WebsocketClient class implementation is very heavily based on the boost example
I expect the websocket connections to take up some file descriptors, but this is an astronomical amount.
The problem is likely in code not shown. I made the code self-contained (admittedly butchering some of the logic, but hey, it needs to compile):
File WebSocketClient.h
#pragma once
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <deque>
namespace asio = boost::asio;
namespace ssl = asio::ssl;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using asio::ip::tcp;
using boost::beast::error_code;
template <typename T> struct LockFreeQueue {
LockFreeQueue(int) {}
LockFreeQueue(LockFreeQueue&& rhs) {
std::unique_lock //
lk1(_mx, std::defer_lock), //
lk2(rhs._mx, std::defer_lock);
std::lock(lk1, lk2);
std::swap(rhs._impl, _impl);
}
bool enqueue(T v) {
std::lock_guard lk(_mx);
_impl.push_back(std::move(v));
if (_cb) {
_cb();
}
return true;
}
bool dequeue(T* v) {
assert(v);
std::lock_guard lk(_mx);
if (_impl.empty())
return false;
if (v)
*v = std::move(_impl.front());
_impl.pop_front();
return true;
};
void setNewMessageWrittenCallback(auto f) {
std::lock_guard lk(_mx);
_cb = std::move(f);
}
private:
std::deque<T> _impl;
std::mutex mutable _mx;
std::function<void()> _cb;
};
class WebSocketClient {
public:
WebSocketClient(asio::io_context& ioctx, std::string host, uint16_t port, std::string path,
std::function<void()> connectionClosedCallback = std::function<void()>(),
std::string customKeepAliveMessage = "",
unsigned int customKeepAliveMessageIntervalInMs = 0);
~WebSocketClient();
WebSocketClient(WebSocketClient&&) = delete;
WebSocketClient(WebSocketClient const&) = delete;
WebSocketClient& operator=(WebSocketClient&&) = delete;
WebSocketClient& operator=(WebSocketClient const&) = delete;
bool sendMessage(std::string message);
std::optional<std::string> readMessageIfAvailable();
void setMessageReceivedCallback(std::function<void()> callback);
void startClose();
bool closed();
private:
void startCustomKeepAliveMessageTimer();
void sendCustomKeepAliveMessage();
using endpoints = tcp::resolver::results_type;
void handleFailure(error_code ec, char const* what);
void handleClose();
void on_resolve(error_code ec, endpoints const& results);
void on_connect(error_code ec, tcp::endpoint const& ep);
void on_ssl_handshake(error_code ec);
void on_handshake(error_code ec);
void startWrite();
void writeCompleted(error_code ec, size_t bytes_transferred);
void startRead();
void readCompleted(error_code ec, size_t bytes_transferred);
void on_close(error_code ec);
asio::io_context& ioContext;
asio::strand<asio::io_context::executor_type> strand;
tcp::resolver resolver_;
ssl::context sslContext;
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
std::string writeBuffer;
beast::flat_buffer readBuffer;
std::string host;
std::string port;
std::string path;
LockFreeQueue<std::string> writeQueue;
LockFreeQueue<std::string> readQueue;
bool writing;
std::atomic_bool closing;
/// Must be captured by the lambda that will be executed on async job returning, pending jobs =
/// .use_count() - 1
std::shared_ptr<void const> asyncJobTracker;
std::function<void()> connectionClosedCallback;
asio::steady_timer customKeepAliveTimer;
std::string customKeepAliveMessage;
unsigned int customKeepAliveMessageIntervalInMs;
};
File WebSocketClient.cpp
#include "WebSocketClient.h"
#include <iostream>
#include <utility>
// #include "CertificateManager.h"
//
static std::ostream LogInfo(std::cout.rdbuf());
static std::ostream LogError(std::cerr.rdbuf());
WebSocketClient::WebSocketClient(asio::io_context& ioctx, std::string host, uint16_t port, std::string path,
std::function<void()> connectionClosedCallback,
std::string customKeepAliveMessage,
unsigned int customKeepAliveMessageIntervalInMs)
: ioContext(ioctx)
, strand(asio::make_strand(ioContext.get_executor()))
, resolver_(strand)
, sslContext(ssl::context::tlsv12_client)
, ws_(strand, sslContext)
, host(std::move(host))
, port(std::to_string(port))
, path(std::move(path))
, writeQueue(128)
, readQueue(128)
, writing(true)
, closing(false)
, connectionClosedCallback(std::move(connectionClosedCallback))
, customKeepAliveTimer(strand)
, customKeepAliveMessage(customKeepAliveMessage)
, customKeepAliveMessageIntervalInMs(customKeepAliveMessageIntervalInMs) {
// error_code ec;
// ec = CertificateManager::getCertificateManager().loadRootCertificates(sslContext);
// if (ec) {
// handleFailure(ec, "loadRootCertificates");
// return;
// }
resolver_.async_resolve(
this->host, this->port,
[this, asyncJobTracker = asyncJobTracker](error_code ec, endpoints const& results) {
on_resolve(ec, results);
});
}
WebSocketClient::~WebSocketClient() {
startClose();
if (asyncJobTracker.use_count() > 1) {
LogError << "WEBSOCKET CONTAINS MAJOR ERROR, async functions not finished after destructor exit"
<< std::endl;
}
}
void WebSocketClient::handleFailure(error_code ec, char const* what) {
if (!closing) {
closing = true;
LogInfo << std::string() + what + ": " + ec.message() + "\n" << std::endl;
}
handleClose();
}
void WebSocketClient::handleClose() {
if (asyncJobTracker.use_count() > 2) { // 1 for the member var, 1 for the function calling this.
return;
}
LogInfo << "Websocket connection closed" << std::endl;
if (connectionClosedCallback) {
connectionClosedCallback();
}
}
void WebSocketClient::on_resolve(error_code ec, endpoints const& results) {
if (ec)
return handleFailure(ec, "resolve");
// Set a timeout on the operation
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(
results, [this, asyncJobTracker = asyncJobTracker](error_code ec, tcp::endpoint const& ep) {
on_connect(ec, ep);
});
}
void WebSocketClient::on_connect(error_code ec, tcp::endpoint const& ep) {
if (ec)
return handleFailure(ec, "connect");
// Set a timeout on the operation
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Set SNI Hostname (many hosts need this to handshake successfully)
if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host.c_str())) {
ec = error_code(static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category());
return handleFailure(ec, "connect");
}
// Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake.
// See https://tools.ietf.org/html/rfc7230#section-5.4
host += ':' + std::to_string(ep.port());
// Perform the SSL handshake
ws_.next_layer().async_handshake(
ssl::stream_base::client,
[this, asyncJobTracker = asyncJobTracker](error_code ec) { on_ssl_handshake(ec); });
}
void WebSocketClient::on_ssl_handshake(error_code ec) {
if (ec)
return handleFailure(ec, "ssl_handshake");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::request_type& req) { req.set(beast::http::field::user_agent, "Agnes"); }));
// Perform the websocket handshake
ws_.async_handshake(host, path,
[this, asyncJobTracker = asyncJobTracker](error_code ec) { on_handshake(ec); });
}
void WebSocketClient::on_handshake(error_code ec) {
if (ec)
return handleFailure(ec, "handshake");
writing = false;
startWrite();
startRead();
if (customKeepAliveMessage != "") {
startCustomKeepAliveMessageTimer();
}
}
void WebSocketClient::startWrite() {
if (writing || closing) {
return;
}
if (!writeQueue.dequeue(&writeBuffer)) {
return;
}
writing = true;
ws_.async_write(asio::buffer(writeBuffer),
[this, asyncJobTracker = asyncJobTracker](error_code ec, size_t bytes_transferred) {
writeCompleted(ec, bytes_transferred);
});
}
void WebSocketClient::writeCompleted(error_code ec, size_t /*bytes_transferred*/) {
if (ec)
return handleFailure(ec, "write");
writing = false;
startWrite();
}
void WebSocketClient::startRead() {
ws_.async_read(readBuffer,
[this, asyncJobTracker = asyncJobTracker](error_code ec, size_t bytes_transferred) {
readCompleted(ec, bytes_transferred);
});
}
void WebSocketClient::readCompleted(error_code ec, size_t bytes_transferred) {
if (ec)
return handleFailure(ec, "read");
readQueue.enqueue({asio::buffer_cast<char const*>(readBuffer.data()), readBuffer.size()});
readBuffer.consume(bytes_transferred);
startRead();
}
void WebSocketClient::startClose() {
if (closing) {
return;
}
closing = true;
asio::post(strand, [this, asyncJobTracker = asyncJobTracker]() {
ws_.async_close(websocket::close_code::normal,
[this, asyncJobTracker = asyncJobTracker](error_code ec) { on_close(ec); });
});
}
void WebSocketClient::on_close(error_code ec) {
if (ec)
return handleFailure(ec, "close");
handleClose();
}
bool WebSocketClient::sendMessage(std::string message) {
if (writeQueue.enqueue(std::move(message))) {
asio::post(strand, [this, asyncJobTracker = asyncJobTracker]() { startWrite(); });
return true;
}
return false;
}
std::optional<std::string> WebSocketClient::readMessageIfAvailable() {
std::string message;
return (readQueue.dequeue(&message) ? message : std::optional<std::string>());
}
void WebSocketClient::setMessageReceivedCallback(std::function<void()> callback) {
readQueue.setNewMessageWrittenCallback(std::move(callback));
}
bool WebSocketClient::closed() { return closing; }
void WebSocketClient::startCustomKeepAliveMessageTimer() {
asio::post(strand, [this]() {
customKeepAliveTimer.expires_after(
std::chrono::milliseconds(customKeepAliveMessageIntervalInMs)); // Set ping interval
customKeepAliveTimer.async_wait([this](error_code ec) {
if (!ec) {
sendCustomKeepAliveMessage();
}
});
});
}
void WebSocketClient::sendCustomKeepAliveMessage() {
if (!closing) {
sendMessage(customKeepAliveMessage);
startCustomKeepAliveMessageTimer(); // Restart the timer for the next ping
}
}
File test.cpp
#include "WebSocketClient.h"
#include <iostream>
#include <list>
int main() {
asio::io_context ioctx;
auto workGuard = make_work_guard(ioctx.get_executor());
// Initialise our IO working threads, 1 for each CPU core.
std::list<std::thread> ioThreads;
for (size_t i = 0; i < 16; ++i)
ioThreads.emplace_back([&ioctx]() { ioctx.run(); });
struct Client {
std::unique_ptr<WebSocketClient> client;
std::unique_ptr<std::binary_semaphore> sem;
std::thread thread;
};
std::list<Client> clients;
for (size_t i = 0; i < 440; ++i) {
auto& [pclient, psem, thread] = clients.emplace_back();
pclient = std::make_unique<WebSocketClient>(ioctx, "ws.postman-echo.com", 443, "/raw", []() {});
auto& client = *pclient;
psem = std::make_unique<std::binary_semaphore>(0);
auto& sem = *psem;
client.setMessageReceivedCallback([&sem] { sem.release(); });
thread = std::thread([index = i, &sem, &client] {
size_t counter = 0;
while (true) {
client.sendMessage(std::to_string(counter++));
sem.acquire();
auto message = client.readMessageIfAvailable();
if (message) {
std::cout << index << " : " << message.value() << std::endl;
}
}
});
}
for (auto& c : clients)
if (c.thread.joinable())
c.thread.join();
workGuard.reset();
for (auto& thread : ioThreads)
if (thread.joinable())
thread.join();
}
Running it on my system opens 446 filedescriptors total, 440 tcp: