Search code examples
c++boostasio

440 Boost ASIO websocket connections take up 220k file descriptors


I had posted this earlier but hadn't posted a proper minimum working example, apologies for that.

The problem is quite clear. Starting 440 websocket connections with Boost ASIO eats up about 220,000 file descriptors.

Here's how I come to that conclusion: I sort each process by the number of file descriptors they eat use: lsof -n | awk '{print $2}' | sort | uniq -c | sort -nr

My PID, 631457, comes out on top: 223504 631457 with 223k file descriptors in use. This is the code:

main.cpp:

int main(int argc, char **argv) {
    boost::asio::io_context ioctx;
    boost::asio::executor_work_guard<boost::asio::io_context::executor_type> workGuard =
        boost::asio::make_work_guard(ioctx.get_executor());

    // Initialise our IO working threads, 1 for each CPU core.
    std::vector<std::shared_ptr<std::thread>> ioThreads;
    for (size_t i = 0; i < 16; ++i) {
        ioThreads.push_back(std::make_shared<std::thread>([&ioctx]() { ioctx.run(); }));
    }

    std::vector<std::shared_ptr<WebSocketClient>> clients;
    std::vector<std::shared_ptr<std::binary_semaphore>> semaphores;
    std::vector<std::shared_ptr<std::thread>> clientThreads;
    for(size_t i = 0; i < 440; ++i){
        clients.push_back(std::make_shared<WebSocketClient>(ioctx,  "ws.postman-echo.com", 443, "/raw",  [](){}));
        semaphores.push_back(std::make_shared<std::binary_semaphore>(0));
        clients.back()->setMessageReceivedCallback([sem = semaphores.back()](){
            sem->release();
        });

        clientThreads.push_back(std::make_shared<std::thread>([index = i, sem = semaphores.back(), client = clients.back()](){
            size_t counter = 0;
            while(true){
                client->sendMessage(std::to_string(counter++));
                sem->acquire();
                auto message = client->readMessageIfAvailable();
                if(message) {
                    std::cout << index << " : " << message.value() << std::endl;
                }
            }
        }));
    }


    for (auto &thread : clientThreads) {
        thread->join();
    }

    for (auto &thread : ioThreads) {
        thread->join();
    }
}

WebsocketClient.h:

class WebSocketClient {
  public:
    /**
     *
     * @param host
     * @param port
     * @param path
     * @param connectionClosedCallback Signals the connection was closed. This function may NOT cause the destructor of
     * WebSocketClient to be called on this thread, since this will cause a deadlock. Always signal another thread.
     */
    WebSocketClient(boost::asio::io_context &ioctx, std::string host, uint16_t port, std::string path,
                    std::function<void()> connectionClosedCallback = std::function<void()>(),

                    std::string customKeepAliveMessage = "", unsigned int customKeepAliveMessageIntervalInMs = 0);

    ~WebSocketClient();

    WebSocketClient(WebSocketClient &&other) = delete;
    WebSocketClient(const WebSocketClient &other) = delete;
    WebSocketClient &operator=(WebSocketClient &&other) = delete;
    WebSocketClient &operator=(const WebSocketClient &other) = delete;

    bool sendMessage(std::string message);
    std::optional<std::string> readMessageIfAvailable();
    void setMessageReceivedCallback(std::function<void()> callback);
    void startClose();
    bool closed();

  private:
    void startCustomKeepAliveMessageTimer();
    void sendCustomKeepAliveMessage();

    void handleFailure(boost::beast::error_code ec, char const *what);
    void handleClose();
    void on_resolve(boost::beast::error_code ec, const boost::asio::ip::tcp::resolver::results_type &results);
    void on_connect(boost::beast::error_code ec, const boost::asio::ip::tcp::resolver::results_type::endpoint_type &ep);
    void on_ssl_handshake(boost::beast::error_code ec);
    void on_handshake(boost::beast::error_code ec);
    void startWrite();
    void writeCompleted(boost::beast::error_code ec, std::size_t bytes_transferred);
    void startRead();
    void readCompleted(boost::beast::error_code ec, std::size_t bytes_transferred);
    void on_close(boost::beast::error_code ec);

    boost::asio::io_context &ioContext;
    boost::asio::strand<boost::asio::io_context::executor_type> strand;

    boost::asio::ip::tcp::resolver resolver_;
    boost::asio::ssl::context sslContext;
    boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>> ws_;
    std::string writeBuffer;
    boost::beast::flat_buffer readBuffer;
    std::string host;
    std::string port;
    std::string path;
    LockFreeQueue<std::string> writeQueue;
    LockFreeQueue<std::string> readQueue;
    bool writing;
    std::atomic<bool> closing;
    /// Must be captured by the lambda that will be executed on async job returning, pending jobs = .use_count() - 1
    std::shared_ptr<nullptr_t> asyncJobTracker;
    std::function<void()> connectionClosedCallback;

    boost::asio::steady_timer customKeepAliveTimer;
    std::string customKeepAliveMessage;
    unsigned int customKeepAliveMessageIntervalInMs;
};

WebsocketClient.cpp:

#include "WebSocketClient.h"

#include <iostream>
#include <utility>

//#include "CertificateManager.h"

WebSocketClient::WebSocketClient(boost::asio::io_context &ioctx,
                                 std::string host, uint16_t port, std::string path,
                                 std::function<void()> connectionClosedCallback,
                                 std::string customKeepAliveMessage,
                                 unsigned int customKeepAliveMessageIntervalInMs)
    : ioContext(ioctx),
      strand(boost::asio::make_strand(ioContext.get_executor())),
      customKeepAliveTimer(strand),
      resolver_(strand),
      sslContext(boost::asio::ssl::context::tlsv12_client),
      ws_(strand    , sslContext),
      writeBuffer(),
      readBuffer(),
      host(std::move(host)),
      port(std::to_string(port)),
      path(std::move(path)),
      writeQueue(128),
      readQueue(128),
      writing(true),
      closing(false),
      asyncJobTracker(std::make_shared<nullptr_t>()),
      connectionClosedCallback(std::move(connectionClosedCallback)),
      customKeepAliveMessage(customKeepAliveMessage),
      customKeepAliveMessageIntervalInMs(customKeepAliveMessageIntervalInMs)
{
//    boost::system::error_code ec;
//    ec = CertificateManager::getCertificateManager().loadRootCertificates(sslContext);
//    if (ec) {
//        handleFailure(ec, "loadRootCertificates");
//        return;
//    }
    resolver_.async_resolve(
        this->host, this->port,
        [this, asyncJobTracker = asyncJobTracker](const boost::beast::error_code ec,
                                                  const boost::asio::ip::tcp::resolver::results_type& results) {
            on_resolve(ec, results);
        });
}

WebSocketClient::~WebSocketClient() {
    startClose();
    if (asyncJobTracker.use_count() > 1) {
//        LogError <<"WEBSOCKET CONTAINS MAJOR ERROR, async functions not finished after destructor exit" << std::endl;
    }
}

void WebSocketClient::handleFailure(boost::beast::error_code ec, const char* what) {
    if (!closing) {
        closing = true;
//        LogInfo << std::string() + what + ": " + ec.message() + "\n" << std::endl;
    }
    handleClose();
}

void WebSocketClient::handleClose() {
    if (asyncJobTracker.use_count() > 2) {  // 1 for the member var, 1 for the function calling this.
        return;
    }
//    LogInfo << "Websocket connection closed" << std::endl;

    if (connectionClosedCallback) {
        connectionClosedCallback();
    }
}

void WebSocketClient::on_resolve(boost::beast::error_code ec,
                                 const boost::asio::ip::tcp::resolver::results_type& results) {
    if (ec) return handleFailure(ec, "resolve");

    // Set a timeout on the operation
    boost::beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

    // Make the connection on the IP address we get from a lookup
    boost::beast::get_lowest_layer(ws_).async_connect(
        results, [this, asyncJobTracker = asyncJobTracker](
                     boost::beast::error_code ec,
                     const boost::asio::ip::tcp::resolver::results_type::endpoint_type& ep) { on_connect(ec, ep); });
}

void WebSocketClient::on_connect(boost::beast::error_code ec,
                                 const boost::asio::ip::tcp::resolver::results_type::endpoint_type& ep) {
    if (ec) return handleFailure(ec, "connect");

    // Set a timeout on the operation
    boost::beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

    // Set SNI Hostname (many hosts need this to handshake successfully)
    if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host.c_str())) {
        ec = boost::beast::error_code(static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category());
        return handleFailure(ec, "connect");
    }

    // Update the host_ string. This will provide the value of the
    // Host HTTP header during the WebSocket handshake.
    // See https://tools.ietf.org/html/rfc7230#section-5.4
    host += ':' + std::to_string(ep.port());

    // Perform the SSL handshake
    ws_.next_layer().async_handshake(
        boost::asio::ssl::stream_base::client,
        [this, asyncJobTracker = asyncJobTracker](boost::beast::error_code ec) { on_ssl_handshake(ec); });
}

void WebSocketClient::on_ssl_handshake(boost::beast::error_code ec) {
    if (ec) return handleFailure(ec, "ssl_handshake");

    // Turn off the timeout on the tcp_stream, because
    // the boost::beast::websocket stream has its own timeout system.
    boost::beast::get_lowest_layer(ws_).expires_never();

    // Set suggested timeout settings for the boost::beast::websocket
    ws_.set_option(boost::beast::websocket::stream_base::timeout::suggested(boost::beast::role_type::client));

    // Set a decorator to change the User-Agent of the handshake
    ws_.set_option(boost::beast::websocket::stream_base::decorator(
        [](boost::beast::websocket::request_type& req) { req.set(boost::beast::http::field::user_agent, "Agnes"); }));

    // Perform the boost::beast::websocket handshake
    ws_.async_handshake(host, path,
                        [this, asyncJobTracker = asyncJobTracker](boost::beast::error_code ec) { on_handshake(ec); });
}

void WebSocketClient::on_handshake(boost::beast::error_code ec) {
    if (ec) return handleFailure(ec, "handshake");
    writing = false;
    startWrite();
    startRead();

    if(customKeepAliveMessage != "")
    {
        startCustomKeepAliveMessageTimer();
    }
}

void WebSocketClient::startWrite() {
    if (writing || closing) {
        return;
    }
    if (!writeQueue.dequeue(&writeBuffer)) {
        return;
    }

    writing = true;
    ws_.async_write(boost::asio::buffer(writeBuffer), [this, asyncJobTracker = asyncJobTracker](
                                                          boost::beast::error_code ec, std::size_t bytes_transferred) {
        writeCompleted(ec, bytes_transferred);
    });
}

void WebSocketClient::writeCompleted(boost::beast::error_code ec, std::size_t /*bytes_transferred*/) {
    if (ec) return handleFailure(ec, "write");
    writing = false;
    startWrite();
}

void WebSocketClient::startRead() {
    ws_.async_read(readBuffer, [this, asyncJobTracker = asyncJobTracker](boost::beast::error_code ec,
                                                                         std::size_t bytes_transferred) {
        readCompleted(ec, bytes_transferred);
    });
}

void WebSocketClient::readCompleted(boost::beast::error_code ec, std::size_t bytes_transferred) {
    if (ec) return handleFailure(ec, "read");

    readQueue.enqueue({boost::asio::buffer_cast<const char*>(readBuffer.data()), readBuffer.size()});
    readBuffer.consume(bytes_transferred);
    startRead();
}

void WebSocketClient::startClose() {
    if (closing) {
        return;
    }
    closing = true;
    boost::asio::post(strand, [this, asyncJobTracker = asyncJobTracker]() {
        ws_.async_close(boost::beast::websocket::close_code::normal,
                        [this, asyncJobTracker = asyncJobTracker](boost::beast::error_code ec) { on_close(ec); });
    });
}

void WebSocketClient::on_close(boost::beast::error_code ec) {
    if (ec) return handleFailure(ec, "close");
    handleClose();
}

bool WebSocketClient::sendMessage(std::string message) {
    if (writeQueue.enqueue(std::move(message))) {
        boost::asio::post(strand, [this, asyncJobTracker = asyncJobTracker]() { startWrite(); });
        return true;
    }
    return false;
}

std::optional<std::string> WebSocketClient::readMessageIfAvailable() {
    std::string message;
    return (readQueue.dequeue(&message) ? message : std::optional<std::string>());
}

void WebSocketClient::setMessageReceivedCallback(std::function<void()> callback) {
    readQueue.setNewMessageWrittenCallback(std::move(callback));
}

bool WebSocketClient::closed() {
    return closing;
}

void WebSocketClient::startCustomKeepAliveMessageTimer() {
    boost::asio::post(strand, [this]() {
        customKeepAliveTimer.expires_after(std::chrono::milliseconds(customKeepAliveMessageIntervalInMs)); // Set ping interval
        customKeepAliveTimer.async_wait([this](const boost::system::error_code& ec) {
            if (!ec) {
                sendCustomKeepAliveMessage();
            }
        });
    });
}

void WebSocketClient::sendCustomKeepAliveMessage() {
    if (!closing) {
        sendMessage(customKeepAliveMessage);
        startCustomKeepAliveMessageTimer();  // Restart the timer for the next ping
    }
}

To reduce the amount of code posted here I have not included the implementation of the LockFreeQueue but I have tested this class thoroughly. The problem does not lie there.

Furthermore, the WebsocketClient class implementation is very heavily based on the boost example

I expect the websocket connections to take up some file descriptors, but this is an astronomical amount.


Solution

  • The problem is likely in code not shown. I made the code self-contained (admittedly butchering some of the logic, but hey, it needs to compile):

    • File WebSocketClient.h

       #pragma once
       #include <boost/asio.hpp>
       #include <boost/beast.hpp>
       #include <boost/beast/ssl.hpp>
       #include <deque>
       namespace asio      = boost::asio;
       namespace ssl       = asio::ssl;
       namespace beast     = boost::beast;
       namespace websocket = beast::websocket;
       using asio::ip::tcp;
       using boost::beast::error_code;
      
       template <typename T> struct LockFreeQueue {
           LockFreeQueue(int) {}
           LockFreeQueue(LockFreeQueue&& rhs) {
               std::unique_lock               //
                   lk1(_mx, std::defer_lock), //
                   lk2(rhs._mx, std::defer_lock);
               std::lock(lk1, lk2);
               std::swap(rhs._impl, _impl);
           }
      
           bool enqueue(T v) {
               std::lock_guard lk(_mx);
               _impl.push_back(std::move(v));
               if (_cb) {
                   _cb();
               }
               return true;
           }
      
           bool dequeue(T* v) {
               assert(v);
               std::lock_guard lk(_mx);
               if (_impl.empty())
                   return false;
               if (v)
                   *v = std::move(_impl.front());
               _impl.pop_front();
               return true;
           };
      
           void setNewMessageWrittenCallback(auto f) {
               std::lock_guard lk(_mx);
               _cb = std::move(f);
           }
      
         private:
           std::deque<T> _impl;
           std::mutex mutable _mx;
           std::function<void()> _cb;
       };
      
       class WebSocketClient {
         public:
           WebSocketClient(asio::io_context& ioctx, std::string host, uint16_t port, std::string path,
                           std::function<void()> connectionClosedCallback           = std::function<void()>(),
                           std::string           customKeepAliveMessage             = "",
                           unsigned int          customKeepAliveMessageIntervalInMs = 0);
      
           ~WebSocketClient();
      
           WebSocketClient(WebSocketClient&&)                 = delete;
           WebSocketClient(WebSocketClient const&)            = delete;
           WebSocketClient& operator=(WebSocketClient&&)      = delete;
           WebSocketClient& operator=(WebSocketClient const&) = delete;
      
           bool                       sendMessage(std::string message);
           std::optional<std::string> readMessageIfAvailable();
           void                       setMessageReceivedCallback(std::function<void()> callback);
           void                       startClose();
           bool                       closed();
      
         private:
           void startCustomKeepAliveMessageTimer();
           void sendCustomKeepAliveMessage();
      
           using endpoints = tcp::resolver::results_type;
           void handleFailure(error_code ec, char const* what);
           void handleClose();
           void on_resolve(error_code ec, endpoints const& results);
           void on_connect(error_code ec, tcp::endpoint const& ep);
           void on_ssl_handshake(error_code ec);
           void on_handshake(error_code ec);
           void startWrite();
           void writeCompleted(error_code ec, size_t bytes_transferred);
           void startRead();
           void readCompleted(error_code ec, size_t bytes_transferred);
           void on_close(error_code ec);
      
           asio::io_context&                             ioContext;
           asio::strand<asio::io_context::executor_type> strand;
      
           tcp::resolver                                           resolver_;
           ssl::context                                            sslContext;
           websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
           std::string                                             writeBuffer;
           beast::flat_buffer                                      readBuffer;
           std::string                                             host;
           std::string                                             port;
           std::string                                             path;
           LockFreeQueue<std::string>                              writeQueue;
           LockFreeQueue<std::string>                              readQueue;
           bool                                                    writing;
           std::atomic_bool                                        closing;
           /// Must be captured by the lambda that will be executed on async job returning, pending jobs =
           /// .use_count() - 1
           std::shared_ptr<void const> asyncJobTracker;
           std::function<void()>       connectionClosedCallback;
      
           asio::steady_timer customKeepAliveTimer;
           std::string        customKeepAliveMessage;
           unsigned int       customKeepAliveMessageIntervalInMs;
       };
      
    • File WebSocketClient.cpp

       #include "WebSocketClient.h"
      
       #include <iostream>
       #include <utility>
      
       // #include "CertificateManager.h"
       //
       static std::ostream LogInfo(std::cout.rdbuf());
       static std::ostream LogError(std::cerr.rdbuf());
      
       WebSocketClient::WebSocketClient(asio::io_context& ioctx, std::string host, uint16_t port, std::string path,
                                        std::function<void()> connectionClosedCallback,
                                        std::string           customKeepAliveMessage,
                                        unsigned int          customKeepAliveMessageIntervalInMs)
           : ioContext(ioctx)
           , strand(asio::make_strand(ioContext.get_executor()))
           , resolver_(strand)
           , sslContext(ssl::context::tlsv12_client)
           , ws_(strand, sslContext)
           , host(std::move(host))
           , port(std::to_string(port))
           , path(std::move(path))
           , writeQueue(128)
           , readQueue(128)
           , writing(true)
           , closing(false)
           , connectionClosedCallback(std::move(connectionClosedCallback))
           , customKeepAliveTimer(strand)
           , customKeepAliveMessage(customKeepAliveMessage)
           , customKeepAliveMessageIntervalInMs(customKeepAliveMessageIntervalInMs) {
           //    error_code ec;
           //    ec = CertificateManager::getCertificateManager().loadRootCertificates(sslContext);
           //    if (ec) {
           //        handleFailure(ec, "loadRootCertificates");
           //        return;
           //    }
           resolver_.async_resolve(
               this->host, this->port,
               [this, asyncJobTracker = asyncJobTracker](error_code ec, endpoints const& results) {
                   on_resolve(ec, results);
               });
       }
      
       WebSocketClient::~WebSocketClient() {
           startClose();
           if (asyncJobTracker.use_count() > 1) {
               LogError << "WEBSOCKET CONTAINS MAJOR ERROR, async functions not finished after destructor exit"
                        << std::endl;
           }
       }
      
       void WebSocketClient::handleFailure(error_code ec, char const* what) {
           if (!closing) {
               closing = true;
               LogInfo << std::string() + what + ": " + ec.message() + "\n" << std::endl;
           }
           handleClose();
       }
      
       void WebSocketClient::handleClose() {
           if (asyncJobTracker.use_count() > 2) { // 1 for the member var, 1 for the function calling this.
               return;
           }
           LogInfo << "Websocket connection closed" << std::endl;
      
           if (connectionClosedCallback) {
               connectionClosedCallback();
           }
       }
      
       void WebSocketClient::on_resolve(error_code ec, endpoints const& results) {
           if (ec)
               return handleFailure(ec, "resolve");
      
           // Set a timeout on the operation
           beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
      
           // Make the connection on the IP address we get from a lookup
           beast::get_lowest_layer(ws_).async_connect(
               results, [this, asyncJobTracker = asyncJobTracker](error_code ec, tcp::endpoint const& ep) {
                   on_connect(ec, ep);
               });
       }
      
       void WebSocketClient::on_connect(error_code ec, tcp::endpoint const& ep) {
           if (ec)
               return handleFailure(ec, "connect");
      
           // Set a timeout on the operation
           beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
      
           // Set SNI Hostname (many hosts need this to handshake successfully)
           if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host.c_str())) {
               ec = error_code(static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category());
               return handleFailure(ec, "connect");
           }
      
           // Update the host_ string. This will provide the value of the
           // Host HTTP header during the WebSocket handshake.
           // See https://tools.ietf.org/html/rfc7230#section-5.4
           host += ':' + std::to_string(ep.port());
      
           // Perform the SSL handshake
           ws_.next_layer().async_handshake(
               ssl::stream_base::client,
               [this, asyncJobTracker = asyncJobTracker](error_code ec) { on_ssl_handshake(ec); });
       }
      
       void WebSocketClient::on_ssl_handshake(error_code ec) {
           if (ec)
               return handleFailure(ec, "ssl_handshake");
      
           // Turn off the timeout on the tcp_stream, because
           // the websocket stream has its own timeout system.
           beast::get_lowest_layer(ws_).expires_never();
      
           // Set suggested timeout settings for the websocket
           ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));
      
           // Set a decorator to change the User-Agent of the handshake
           ws_.set_option(websocket::stream_base::decorator(
               [](websocket::request_type& req) { req.set(beast::http::field::user_agent, "Agnes"); }));
      
           // Perform the websocket handshake
           ws_.async_handshake(host, path,
                               [this, asyncJobTracker = asyncJobTracker](error_code ec) { on_handshake(ec); });
       }
      
       void WebSocketClient::on_handshake(error_code ec) {
           if (ec)
               return handleFailure(ec, "handshake");
           writing = false;
           startWrite();
           startRead();
      
           if (customKeepAliveMessage != "") {
               startCustomKeepAliveMessageTimer();
           }
       }
      
       void WebSocketClient::startWrite() {
           if (writing || closing) {
               return;
           }
           if (!writeQueue.dequeue(&writeBuffer)) {
               return;
           }
      
           writing = true;
           ws_.async_write(asio::buffer(writeBuffer),
                           [this, asyncJobTracker = asyncJobTracker](error_code ec, size_t bytes_transferred) {
                               writeCompleted(ec, bytes_transferred);
                           });
       }
      
       void WebSocketClient::writeCompleted(error_code ec, size_t /*bytes_transferred*/) {
           if (ec)
               return handleFailure(ec, "write");
           writing = false;
           startWrite();
       }
      
       void WebSocketClient::startRead() {
           ws_.async_read(readBuffer,
                          [this, asyncJobTracker = asyncJobTracker](error_code ec, size_t bytes_transferred) {
                              readCompleted(ec, bytes_transferred);
                          });
       }
      
       void WebSocketClient::readCompleted(error_code ec, size_t bytes_transferred) {
           if (ec)
               return handleFailure(ec, "read");
      
           readQueue.enqueue({asio::buffer_cast<char const*>(readBuffer.data()), readBuffer.size()});
           readBuffer.consume(bytes_transferred);
           startRead();
       }
      
       void WebSocketClient::startClose() {
           if (closing) {
               return;
           }
           closing = true;
           asio::post(strand, [this, asyncJobTracker = asyncJobTracker]() {
               ws_.async_close(websocket::close_code::normal,
                               [this, asyncJobTracker = asyncJobTracker](error_code ec) { on_close(ec); });
           });
       }
      
       void WebSocketClient::on_close(error_code ec) {
           if (ec)
               return handleFailure(ec, "close");
           handleClose();
       }
      
       bool WebSocketClient::sendMessage(std::string message) {
           if (writeQueue.enqueue(std::move(message))) {
               asio::post(strand, [this, asyncJobTracker = asyncJobTracker]() { startWrite(); });
               return true;
           }
           return false;
       }
      
       std::optional<std::string> WebSocketClient::readMessageIfAvailable() {
           std::string message;
           return (readQueue.dequeue(&message) ? message : std::optional<std::string>());
       }
      
       void WebSocketClient::setMessageReceivedCallback(std::function<void()> callback) {
           readQueue.setNewMessageWrittenCallback(std::move(callback));
       }
      
       bool WebSocketClient::closed() { return closing; }
      
       void WebSocketClient::startCustomKeepAliveMessageTimer() {
           asio::post(strand, [this]() {
               customKeepAliveTimer.expires_after(
                   std::chrono::milliseconds(customKeepAliveMessageIntervalInMs)); // Set ping interval
               customKeepAliveTimer.async_wait([this](error_code ec) {
                   if (!ec) {
                       sendCustomKeepAliveMessage();
                   }
               });
           });
       }
      
       void WebSocketClient::sendCustomKeepAliveMessage() {
           if (!closing) {
               sendMessage(customKeepAliveMessage);
               startCustomKeepAliveMessageTimer(); // Restart the timer for the next ping
           }
       }
      
    • File test.cpp

       #include "WebSocketClient.h"
       #include <iostream>
       #include <list>
      
       int main() {
           asio::io_context ioctx;
           auto             workGuard = make_work_guard(ioctx.get_executor());
      
           // Initialise our IO working threads, 1 for each CPU core.
           std::list<std::thread> ioThreads;
           for (size_t i = 0; i < 16; ++i)
               ioThreads.emplace_back([&ioctx]() { ioctx.run(); });
      
           struct Client {
               std::unique_ptr<WebSocketClient>       client;
               std::unique_ptr<std::binary_semaphore> sem;
               std::thread                            thread;
           };
      
           std::list<Client> clients;
           for (size_t i = 0; i < 440; ++i) {
               auto& [pclient, psem, thread] = clients.emplace_back();
      
               pclient      = std::make_unique<WebSocketClient>(ioctx, "ws.postman-echo.com", 443, "/raw", []() {});
               auto& client = *pclient;
      
               psem         = std::make_unique<std::binary_semaphore>(0);
               auto& sem    = *psem;
      
               client.setMessageReceivedCallback([&sem] { sem.release(); });
               thread = std::thread([index = i, &sem, &client] {
                   size_t counter = 0;
                   while (true) {
                       client.sendMessage(std::to_string(counter++));
                       sem.acquire();
                       auto message = client.readMessageIfAvailable();
                       if (message) {
                           std::cout << index << " : " << message.value() << std::endl;
                       }
                   }
               });
           }
      
           for (auto& c : clients)
               if (c.thread.joinable())
                   c.thread.join();
      
           workGuard.reset();
           for (auto& thread : ioThreads)
               if (thread.joinable())
                   thread.join();
       }
      

    Running it on my system opens 446 filedescriptors total, 440 tcp:

    enter image description here