Search code examples
c++boostboost-asio

Error stream_impl.hpp:367:13 in function 'bool boost::beast::websocket::stream< <template-parameter-1-1>, <anonymous>


can you tell me where to look for the problem, I've been sitting for a couple of days now and I can't figure out what's the matter?

  1. My WS client successfully login and connects to the server, after which it waits for data to be sent from the main application.

  2. after 2-3 minutes of stable work, I get an error:

    [WSCO]: Connection failed: Operation canceled [system:125 at /usr/include/boost/beast/websocket/impl/stream_impl.hpp:367:13 in function 'bool boost::beast::websocket::stream< , >::impl_type::check_stop_now(boost::beast::error_code&)']. Retrying...

  3. After that, the cat tries to reconnect an infinite number of times (as it should be), but any attempt to reconnect immediately ends with the same error immediately

At the same time, no WS data was sent at that moment. Can you tell me how to fix this, or at least understand what the problem might be?

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <iomanip>
#include <iostream>
#include <nlohmann/json.hpp>
#include <openssl/hmac.h>
#include <queue>
#include <sstream>
namespace asio      = boost::asio;
using tcp = asio::ip::tcp;
namespace beast = boost::beast;
namespace websocket = boost::beast::websocket;

struct WebSocketClientOrder {
    static const std::string DEFAULT_URI;
    static const int RECONNECTION_TIMEOUT = 5;

    WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context, std::string api_key,
                         std::string api_secret);

    void connect(const std::string& host, const std::string& port);
    void run() const;
    void place_order(std::string const& symbol_first, std::string const& symbol_second,
                     std::string const& side_first, std::string const& side_second, double qty);
    void close();
    void send_async(std::string const& message);

  private:
    std::mutex              message_queue_mutex_;
    std::queue<std::string> message_queue_;

    void        do_read();
    void        reconnect();
    void        process_next_message();
    void        authenticate();
    std::string generate_signature(long expires);

    asio::io_context&                                 io_context_;
    tcp::resolver                                     resolver_;
    websocket::stream<asio::ssl::stream<tcp::socket>> ws_;
    beast::flat_buffer                                buffer_;

    std::string api_key_, api_secret_;
    std::string host_, port_;
};

const std::string WebSocketClientOrder::DEFAULT_URI = "/v5/trade";

WebSocketClientOrder::WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context,
                                           std::string api_key, std::string api_secret)
    : io_context_(io_context)
    , resolver_(io_context)
    , ws_(asio::make_strand(io_context), ssl_context)
    , api_key_(std::move(api_key))
    , api_secret_(std::move(api_secret)) {}

/**
 * Establishes a connection to the exchange's server
 * @param host
 * @param port
 */
void WebSocketClientOrder::connect(const std::string& host, const std::string& port) {
    host_ = host;
    port_ = port;

    try {
        auto results = resolver_.resolve(host, port);
        asio::connect(ws_.next_layer().next_layer(), results);

        if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host.c_str())) {
            throw beast::system_error(
                beast::error_code(static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()),
                "[WSCO]: Failed to set SNI"
            );
        }

        ws_.next_layer().handshake(asio::ssl::stream_base::client);
        ws_.handshake(host + ":" + port, DEFAULT_URI);
        std::cout << "[WSCO]: Connected to " << host << std::endl;

        authenticate();
        do_read();
    } catch (const std::exception& e) {
        std::cerr << "[WSCO]: Connection failed: " << e.what() << ". Retrying..." << std::endl;
        reconnect();
    }
}

/**
 * Keeps the connection up to date
 */
void WebSocketClientOrder::run() const { io_context_.run(); }

/**
 * Sends two orders to the exchange via the WS channel
 * @link http://bybit-exchange.github.io/docs/v5/websocket/trade/guideline
 */
void WebSocketClientOrder::place_order([[maybe_unused]] std::string const& symbol_first,
                                       [[maybe_unused]] std::string const& symbol_second,
                                       [[maybe_unused]] std::string const& side_first,
                                       [[maybe_unused]] std::string const& side_second, //
                                       double const                        qty) {
    static std::string const qty_str     = std::to_string(qty); // Caching qty in string format
    static const std::string recv_window = "8000";

    const long timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
    const std::string timestamp_str = std::to_string(timestamp);

    // todo - call send_async
}

/**
 * Performs service data processing before parsing
 */
void WebSocketClientOrder::do_read() {
    ws_.async_read(buffer_, [this](beast::error_code ec, size_t bytes_transferred) {
        if (ec) {
            std::cerr << "[WSCO]: Read error: " << ec.message() << ". Reconnecting..." << std::endl;
            reconnect();
            return;
        }

        std::string response = beast::buffers_to_string(buffer_.data());
        std::cout << "[WSCO]: Server Response: " << response << std::endl;
        buffer_.consume(bytes_transferred);

        do_read();
    });
}


/**
 * Reconnects the websocket connection in case of loss.
 */
void WebSocketClientOrder::reconnect() {
    close();
    std::this_thread::sleep_for(std::chrono::seconds(RECONNECTION_TIMEOUT));
    connect(host_, port_);
}

/**
 * Closes the SSL session correctly
 */
void WebSocketClientOrder::close() {
    try {
        if (ws_.is_open()) {
            beast::error_code ec;
            ws_.close(websocket::close_code::normal, ec);
            if (ec && ec != beast::errc::not_connected) {
                std::cerr << "[WSCO]: Close failed: " << ec.message() << std::endl;
            }
        }

        beast::error_code ec;

        // Checking whether the SSL layer is open before attempting to close
        if (ws_.next_layer().lowest_layer().is_open()) {
            ws_.next_layer().shutdown(ec);
            if (ec && ec != asio::error::eof && ec != asio::ssl::error::stream_truncated) {
                std::cerr << "[WSCO]: SSL shutdown failed: " << ec.message() << std::endl;
            }
        }

        // Closing the socket
        ws_.next_layer().lowest_layer().close(ec);
        if (ec) {
            std::cerr << "[WSCO]: Socket close failed: " << ec.message() << std::endl;
        }

    } catch (const std::exception &e) {
        std::cerr << "[WSCO]: Exception during close: " << e.what() << std::endl;
    }
}


/**
 * Sends the message asynchronously, adding it to a queue for sequential sending
 */
void WebSocketClientOrder::send_async(const std::string& message) {
    // Lock the mutex to ensure thread-safety when accessing the queue
    {
        std::lock_guard<std::mutex> lock(message_queue_mutex_);
        message_queue_.push(message);
    }

    // If this is the first message in the queue, start sending
    if (message_queue_.size() == 1) {
        process_next_message();
    }
}

/**
 * Processes and sends the next message in the queue
 */
void WebSocketClientOrder::process_next_message() {
    std::lock_guard<std::mutex> lock(message_queue_mutex_);

    if (!message_queue_.empty()) {
        std::string message = message_queue_.front();

        ws_.async_write(
            asio::buffer(message),
            [this, message](beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
                if (ec) {
                    std::cerr << "[WSCO]: Async send failed: " << ec.message() << std::endl;
                    reconnect();
                    return;
                }
                std::cout << "[WSCO]: Async message sent: " << message << std::endl;

                {
                    std::lock_guard<std::mutex> lock(message_queue_mutex_);
                    message_queue_.pop();
                }

                process_next_message();
            });
    }
}

/**
 * Generates an hmac signature for the request
 */
std::string WebSocketClientOrder::generate_signature(long expires) {
    std::string data = "GET/realtime" + std::to_string(expires);
    unsigned char hmac_result[EVP_MAX_MD_SIZE];
    unsigned int len = 0;
    HMAC(EVP_sha256(), api_secret_.c_str(), api_secret_.length(),
         reinterpret_cast<const unsigned char*>(data.c_str()), data.length(), hmac_result, &len);
    std::ostringstream oss;
    for (unsigned int i = 0; i < len; i++) {
        oss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(hmac_result[i]);
    }
    return oss.str();
}

/**
 * Performs authentication on the server
 */
void WebSocketClientOrder::authenticate() {
    long expires = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1) + 5000;
    std::string signature = generate_signature(expires);

    nlohmann::json auth_message = {
        {"op", "auth"},
        {"args", {api_key_, std::to_string(expires), signature}}
    };
    send_async(auth_message.dump());
}

int main() {
    asio::io_context io_context;
    asio::ssl::context ssl_context(asio::ssl::context::tlsv12_client);

    WebSocketClientOrder client(io_context, ssl_context, "nPlgUfp7VapVOrZi3e",
                                "wL9k4of15mXmEY3DALcBUNQGagW3oq8IJXmg");
    client.connect("stream.bybit.com", "443");
    client.run();
}

Solution

  • After making your question self-contained, I realized it did not reproduce your timeout behavior. I made it reproduce it by setting stringent timeout options:

    ws_.set_option(websocket::stream_base::timeout{
        std::chrono::seconds(3), // handshake timeout
        std::chrono::seconds(3), // idle timeout
        false,
    });
    asio::connect(ws_.next_layer().next_layer(), results);
    

    Now you see:

    [WSCO]: Connected to stream.bybit.com
    [WSCO]: Async message sent: {"args":["nPlgUfp7VapVOrZi3e","1738963686341","4dbce3356ec84cbe463a75d572624101a4c0332f76101332c9d3b86d9c18cb1a"],"op":"auth"}
    [WSCO]: Read error: The socket was closed due to a timeout. Reconnecting...
    [WSCO]: Connection failed: packet length too long (SSL routines) [asio.ssl:167772358]. Retrying...
    [WSCO]: SSL shutdown failed: shutdown while in init (SSL routines)
    

    Clearly, the problem was re-using the underlying SSL stream. I'd suggest creating a new stream instance each time:

    std::optional<Stream> ws_;
    asio::ssl::context&   ssl_context_;
    

    Then at the end of close():

    ws_.reset();
    

    And in reconnect:

    ws_.emplace(asio::make_strand(io_context_), ssl_context_);
    host_ = host;
    port_ = port;
    

    Now you can see:

    Full Listing With Fixes:

    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/ssl.hpp>
    #include <iomanip>
    #include <iostream>
    #include <nlohmann/json.hpp>
    #include <openssl/hmac.h>
    #include <queue>
    #include <sstream>
    namespace asio      = boost::asio;
    using tcp = asio::ip::tcp;
    namespace beast = boost::beast;
    namespace websocket = boost::beast::websocket;
    
    struct WebSocketClientOrder {
        static const std::string DEFAULT_URI;
        static const int RECONNECTION_TIMEOUT = 5;
    
        WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context, std::string api_key,
                             std::string api_secret);
    
        void connect(const std::string& host, const std::string& port);
        void run() const;
        void place_order(std::string const& symbol_first, std::string const& symbol_second,
                         std::string const& side_first, std::string const& side_second, double qty);
        void close();
        void send_async(std::string const& message);
    
      private:
        std::mutex              message_queue_mutex_;
        std::queue<std::string> message_queue_;
    
        void        do_read();
        void        reconnect();
        void        process_next_message();
        void        authenticate();
        std::string generate_signature(long expires);
    
        using Stream = websocket::stream<asio::ssl::stream<tcp::socket>>;
    
        asio::io_context&     io_context_;
        tcp::resolver         resolver_;
        std::optional<Stream> ws_;
        asio::ssl::context&   ssl_context_;
        beast::flat_buffer    buffer_;
    
        std::string api_key_, api_secret_;
        std::string host_, port_;
    };
    
    const std::string WebSocketClientOrder::DEFAULT_URI = "/v5/trade";
    
    WebSocketClientOrder::WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context,
                                               std::string api_key, std::string api_secret)
        : io_context_(io_context)
        , resolver_(io_context)
        , ssl_context_(ssl_context)
        , api_key_(std::move(api_key))
        , api_secret_(std::move(api_secret)) {}
    
    /**
     * Establishes a connection to the exchange's server
     * @param host
     * @param port
     */
    void WebSocketClientOrder::connect(const std::string& host, const std::string& port) {
        ws_.emplace(asio::make_strand(io_context_), ssl_context_);
        host_ = host;
        port_ = port;
    
        try {
            auto results = resolver_.resolve(host, port);
            ws_->set_option(websocket::stream_base::timeout{
                std::chrono::seconds(3), // handshake timeout
                std::chrono::seconds(3), // idle timeout
                false,
            });
            asio::connect(ws_->next_layer().next_layer(), results);
    
            if (!SSL_set_tlsext_host_name(ws_->next_layer().native_handle(), host.c_str())) {
                throw beast::system_error(
                    beast::error_code(static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()),
                    "[WSCO]: Failed to set SNI"
                );
            }
    
            ws_->next_layer().handshake(asio::ssl::stream_base::client);
            ws_->handshake(host + ":" + port, DEFAULT_URI);
            std::cout << "[WSCO]: Connected to " << host << std::endl;
    
            authenticate();
            do_read();
        } catch (const std::exception& e) {
            std::cerr << "[WSCO]: Connection failed: " << e.what() << ". Retrying..." << std::endl;
            reconnect();
        }
    }
    
    /**
     * Keeps the connection up to date
     */
    void WebSocketClientOrder::run() const { io_context_.run(); }
    
    /**
     * Sends two orders to the exchange via the WS channel
     * @link http://bybit-exchange.github.io/docs/v5/websocket/trade/guideline
     */
    void WebSocketClientOrder::place_order([[maybe_unused]] std::string const& symbol_first,
                                           [[maybe_unused]] std::string const& symbol_second,
                                           [[maybe_unused]] std::string const& side_first,
                                           [[maybe_unused]] std::string const& side_second, //
                                           double const                        qty) {
        static std::string const qty_str     = std::to_string(qty); // Caching qty in string format
        static const std::string recv_window = "8000";
    
        const long timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
        const std::string timestamp_str = std::to_string(timestamp);
    
        // todo - call send_async
    }
    
    /**
     * Performs service data processing before parsing
     */
    void WebSocketClientOrder::do_read() {
        ws_->async_read(buffer_, [this](beast::error_code ec, size_t bytes_transferred) {
            if (ec) {
                std::cerr << "[WSCO]: Read error: " << ec.message() << ". Reconnecting..." << std::endl;
                reconnect();
                return;
            }
    
            std::string response = beast::buffers_to_string(buffer_.data()).substr(0, bytes_transferred);
            std::cout << "[WSCO]: Server Response: " << response << std::endl;
            buffer_.consume(bytes_transferred);
    
            do_read();
        });
    }
    
    
    /**
     * Reconnects the websocket connection in case of loss.
     */
    void WebSocketClientOrder::reconnect() {
        close();
    
        std::this_thread::sleep_for(std::chrono::seconds(RECONNECTION_TIMEOUT));
        connect(host_, port_);
    }
    
    /**
     * Closes the SSL session correctly
     */
    void WebSocketClientOrder::close() {
        try {
            if (ws_->is_open()) {
                beast::error_code ec;
                ws_->close(websocket::close_code::normal, ec);
                if (ec && ec != beast::errc::not_connected) {
                    std::cerr << "[WSCO]: Close failed: " << ec.message() << std::endl;
                }
            }
    
            beast::error_code ec;
    
            // Checking whether the SSL layer is open before attempting to close
            if (ws_->next_layer().lowest_layer().is_open()) {
                ws_->next_layer().shutdown(ec);
                if (ec && ec != asio::error::eof && ec != asio::ssl::error::stream_truncated) {
                    std::cerr << "[WSCO]: SSL shutdown failed: " << ec.message() << std::endl;
                }
            }
    
            // Closing the socket
            ws_->next_layer().lowest_layer().close(ec);
            if (ec) {
                std::cerr << "[WSCO]: Socket close failed: " << ec.message() << std::endl;
            }
    
        } catch (const std::exception &e) {
            std::cerr << "[WSCO]: Exception during close: " << e.what() << std::endl;
        }
        ws_.reset();
    }
    
    /**
     * Sends the message asynchronously, adding it to a queue for sequential sending
     */
    void WebSocketClientOrder::send_async(const std::string& message) {
        // Lock the mutex to ensure thread-safety when accessing the queue
        {
            std::lock_guard<std::mutex> lock(message_queue_mutex_);
            message_queue_.push(message);
        }
    
        // If this is the first message in the queue, start sending
        if (message_queue_.size() == 1) {
            process_next_message();
        }
    }
    
    /**
     * Processes and sends the next message in the queue
     */
    void WebSocketClientOrder::process_next_message() {
        std::lock_guard<std::mutex> lock(message_queue_mutex_);
    
        if (!message_queue_.empty()) {
            std::string message = message_queue_.front();
    
            ws_->async_write(
                asio::buffer(message),
                [this, message](beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
                    if (ec) {
                        std::cerr << "[WSCO]: Async send failed: " << ec.message() << std::endl;
                        reconnect();
                        return;
                    }
                    std::cout << "[WSCO]: Async message sent: " << message << std::endl;
    
                    {
                        std::lock_guard<std::mutex> lock(message_queue_mutex_);
                        message_queue_.pop();
                    }
    
                    process_next_message();
                });
        }
    }
    
    /**
     * Generates an hmac signature for the request
     */
    std::string WebSocketClientOrder::generate_signature(long expires) {
        std::string data = "GET/realtime" + std::to_string(expires);
        unsigned char hmac_result[EVP_MAX_MD_SIZE];
        unsigned int len = 0;
        HMAC(EVP_sha256(), api_secret_.c_str(), api_secret_.length(),
             reinterpret_cast<const unsigned char*>(data.c_str()), data.length(), hmac_result, &len);
        std::ostringstream oss;
        for (unsigned int i = 0; i < len; i++) {
            oss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(hmac_result[i]);
        }
        return oss.str();
    }
    
    /**
     * Performs authentication on the server
     */
    void WebSocketClientOrder::authenticate() {
        long expires = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1) + 5000;
        std::string signature = generate_signature(expires);
    
        nlohmann::json auth_message = {
            {"op", "auth"},
            {"args", {api_key_, std::to_string(expires), signature}}
        };
        send_async(auth_message.dump());
    }
    
    int main() {
        asio::io_context io_context;
        asio::ssl::context ssl_context(asio::ssl::context::tlsv12_client);
    
        WebSocketClientOrder client(io_context, ssl_context, "nPlgUfp7VapVOrZi3e",
                                    "wL9k4of15mXmEY3DALcBUNQGagW3oq8IJXmg");
        client.connect("stream.bybit.com", "443");
        client.run();
    }