can you tell me where to look for the problem, I've been sitting for a couple of days now and I can't figure out what's the matter?
My WS client successfully login and connects to the server, after which it waits for data to be sent from the main application.
after 2-3 minutes of stable work, I get an error:
[WSCO]: Connection failed: Operation canceled [system:125 at /usr/include/boost/beast/websocket/impl/stream_impl.hpp:367:13 in function 'bool boost::beast::websocket::stream< , >::impl_type::check_stop_now(boost::beast::error_code&)']. Retrying...
After that, the cat tries to reconnect an infinite number of times (as it should be), but any attempt to reconnect immediately ends with the same error immediately
At the same time, no WS data was sent at that moment. Can you tell me how to fix this, or at least understand what the problem might be?
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <iomanip>
#include <iostream>
#include <nlohmann/json.hpp>
#include <openssl/hmac.h>
#include <queue>
#include <sstream>
namespace asio = boost::asio;
using tcp = asio::ip::tcp;
namespace beast = boost::beast;
namespace websocket = boost::beast::websocket;
struct WebSocketClientOrder {
static const std::string DEFAULT_URI;
static const int RECONNECTION_TIMEOUT = 5;
WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context, std::string api_key,
std::string api_secret);
void connect(const std::string& host, const std::string& port);
void run() const;
void place_order(std::string const& symbol_first, std::string const& symbol_second,
std::string const& side_first, std::string const& side_second, double qty);
void close();
void send_async(std::string const& message);
private:
std::mutex message_queue_mutex_;
std::queue<std::string> message_queue_;
void do_read();
void reconnect();
void process_next_message();
void authenticate();
std::string generate_signature(long expires);
asio::io_context& io_context_;
tcp::resolver resolver_;
websocket::stream<asio::ssl::stream<tcp::socket>> ws_;
beast::flat_buffer buffer_;
std::string api_key_, api_secret_;
std::string host_, port_;
};
const std::string WebSocketClientOrder::DEFAULT_URI = "/v5/trade";
WebSocketClientOrder::WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context,
std::string api_key, std::string api_secret)
: io_context_(io_context)
, resolver_(io_context)
, ws_(asio::make_strand(io_context), ssl_context)
, api_key_(std::move(api_key))
, api_secret_(std::move(api_secret)) {}
/**
* Establishes a connection to the exchange's server
* @param host
* @param port
*/
void WebSocketClientOrder::connect(const std::string& host, const std::string& port) {
host_ = host;
port_ = port;
try {
auto results = resolver_.resolve(host, port);
asio::connect(ws_.next_layer().next_layer(), results);
if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host.c_str())) {
throw beast::system_error(
beast::error_code(static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()),
"[WSCO]: Failed to set SNI"
);
}
ws_.next_layer().handshake(asio::ssl::stream_base::client);
ws_.handshake(host + ":" + port, DEFAULT_URI);
std::cout << "[WSCO]: Connected to " << host << std::endl;
authenticate();
do_read();
} catch (const std::exception& e) {
std::cerr << "[WSCO]: Connection failed: " << e.what() << ". Retrying..." << std::endl;
reconnect();
}
}
/**
* Keeps the connection up to date
*/
void WebSocketClientOrder::run() const { io_context_.run(); }
/**
* Sends two orders to the exchange via the WS channel
* @link http://bybit-exchange.github.io/docs/v5/websocket/trade/guideline
*/
void WebSocketClientOrder::place_order([[maybe_unused]] std::string const& symbol_first,
[[maybe_unused]] std::string const& symbol_second,
[[maybe_unused]] std::string const& side_first,
[[maybe_unused]] std::string const& side_second, //
double const qty) {
static std::string const qty_str = std::to_string(qty); // Caching qty in string format
static const std::string recv_window = "8000";
const long timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
const std::string timestamp_str = std::to_string(timestamp);
// todo - call send_async
}
/**
* Performs service data processing before parsing
*/
void WebSocketClientOrder::do_read() {
ws_.async_read(buffer_, [this](beast::error_code ec, size_t bytes_transferred) {
if (ec) {
std::cerr << "[WSCO]: Read error: " << ec.message() << ". Reconnecting..." << std::endl;
reconnect();
return;
}
std::string response = beast::buffers_to_string(buffer_.data());
std::cout << "[WSCO]: Server Response: " << response << std::endl;
buffer_.consume(bytes_transferred);
do_read();
});
}
/**
* Reconnects the websocket connection in case of loss.
*/
void WebSocketClientOrder::reconnect() {
close();
std::this_thread::sleep_for(std::chrono::seconds(RECONNECTION_TIMEOUT));
connect(host_, port_);
}
/**
* Closes the SSL session correctly
*/
void WebSocketClientOrder::close() {
try {
if (ws_.is_open()) {
beast::error_code ec;
ws_.close(websocket::close_code::normal, ec);
if (ec && ec != beast::errc::not_connected) {
std::cerr << "[WSCO]: Close failed: " << ec.message() << std::endl;
}
}
beast::error_code ec;
// Checking whether the SSL layer is open before attempting to close
if (ws_.next_layer().lowest_layer().is_open()) {
ws_.next_layer().shutdown(ec);
if (ec && ec != asio::error::eof && ec != asio::ssl::error::stream_truncated) {
std::cerr << "[WSCO]: SSL shutdown failed: " << ec.message() << std::endl;
}
}
// Closing the socket
ws_.next_layer().lowest_layer().close(ec);
if (ec) {
std::cerr << "[WSCO]: Socket close failed: " << ec.message() << std::endl;
}
} catch (const std::exception &e) {
std::cerr << "[WSCO]: Exception during close: " << e.what() << std::endl;
}
}
/**
* Sends the message asynchronously, adding it to a queue for sequential sending
*/
void WebSocketClientOrder::send_async(const std::string& message) {
// Lock the mutex to ensure thread-safety when accessing the queue
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);
message_queue_.push(message);
}
// If this is the first message in the queue, start sending
if (message_queue_.size() == 1) {
process_next_message();
}
}
/**
* Processes and sends the next message in the queue
*/
void WebSocketClientOrder::process_next_message() {
std::lock_guard<std::mutex> lock(message_queue_mutex_);
if (!message_queue_.empty()) {
std::string message = message_queue_.front();
ws_.async_write(
asio::buffer(message),
[this, message](beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
if (ec) {
std::cerr << "[WSCO]: Async send failed: " << ec.message() << std::endl;
reconnect();
return;
}
std::cout << "[WSCO]: Async message sent: " << message << std::endl;
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);
message_queue_.pop();
}
process_next_message();
});
}
}
/**
* Generates an hmac signature for the request
*/
std::string WebSocketClientOrder::generate_signature(long expires) {
std::string data = "GET/realtime" + std::to_string(expires);
unsigned char hmac_result[EVP_MAX_MD_SIZE];
unsigned int len = 0;
HMAC(EVP_sha256(), api_secret_.c_str(), api_secret_.length(),
reinterpret_cast<const unsigned char*>(data.c_str()), data.length(), hmac_result, &len);
std::ostringstream oss;
for (unsigned int i = 0; i < len; i++) {
oss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(hmac_result[i]);
}
return oss.str();
}
/**
* Performs authentication on the server
*/
void WebSocketClientOrder::authenticate() {
long expires = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1) + 5000;
std::string signature = generate_signature(expires);
nlohmann::json auth_message = {
{"op", "auth"},
{"args", {api_key_, std::to_string(expires), signature}}
};
send_async(auth_message.dump());
}
int main() {
asio::io_context io_context;
asio::ssl::context ssl_context(asio::ssl::context::tlsv12_client);
WebSocketClientOrder client(io_context, ssl_context, "nPlgUfp7VapVOrZi3e",
"wL9k4of15mXmEY3DALcBUNQGagW3oq8IJXmg");
client.connect("stream.bybit.com", "443");
client.run();
}
After making your question self-contained, I realized it did not reproduce your timeout behavior. I made it reproduce it by setting stringent timeout options:
ws_.set_option(websocket::stream_base::timeout{
std::chrono::seconds(3), // handshake timeout
std::chrono::seconds(3), // idle timeout
false,
});
asio::connect(ws_.next_layer().next_layer(), results);
Now you see:
[WSCO]: Connected to stream.bybit.com
[WSCO]: Async message sent: {"args":["nPlgUfp7VapVOrZi3e","1738963686341","4dbce3356ec84cbe463a75d572624101a4c0332f76101332c9d3b86d9c18cb1a"],"op":"auth"}
[WSCO]: Read error: The socket was closed due to a timeout. Reconnecting...
[WSCO]: Connection failed: packet length too long (SSL routines) [asio.ssl:167772358]. Retrying...
[WSCO]: SSL shutdown failed: shutdown while in init (SSL routines)
Clearly, the problem was re-using the underlying SSL stream. I'd suggest creating a new stream instance each time:
std::optional<Stream> ws_;
asio::ssl::context& ssl_context_;
Then at the end of close()
:
ws_.reset();
And in reconnect:
ws_.emplace(asio::make_strand(io_context_), ssl_context_);
host_ = host;
port_ = port;
Now you can see:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <iomanip>
#include <iostream>
#include <nlohmann/json.hpp>
#include <openssl/hmac.h>
#include <queue>
#include <sstream>
namespace asio = boost::asio;
using tcp = asio::ip::tcp;
namespace beast = boost::beast;
namespace websocket = boost::beast::websocket;
struct WebSocketClientOrder {
static const std::string DEFAULT_URI;
static const int RECONNECTION_TIMEOUT = 5;
WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context, std::string api_key,
std::string api_secret);
void connect(const std::string& host, const std::string& port);
void run() const;
void place_order(std::string const& symbol_first, std::string const& symbol_second,
std::string const& side_first, std::string const& side_second, double qty);
void close();
void send_async(std::string const& message);
private:
std::mutex message_queue_mutex_;
std::queue<std::string> message_queue_;
void do_read();
void reconnect();
void process_next_message();
void authenticate();
std::string generate_signature(long expires);
using Stream = websocket::stream<asio::ssl::stream<tcp::socket>>;
asio::io_context& io_context_;
tcp::resolver resolver_;
std::optional<Stream> ws_;
asio::ssl::context& ssl_context_;
beast::flat_buffer buffer_;
std::string api_key_, api_secret_;
std::string host_, port_;
};
const std::string WebSocketClientOrder::DEFAULT_URI = "/v5/trade";
WebSocketClientOrder::WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context,
std::string api_key, std::string api_secret)
: io_context_(io_context)
, resolver_(io_context)
, ssl_context_(ssl_context)
, api_key_(std::move(api_key))
, api_secret_(std::move(api_secret)) {}
/**
* Establishes a connection to the exchange's server
* @param host
* @param port
*/
void WebSocketClientOrder::connect(const std::string& host, const std::string& port) {
ws_.emplace(asio::make_strand(io_context_), ssl_context_);
host_ = host;
port_ = port;
try {
auto results = resolver_.resolve(host, port);
ws_->set_option(websocket::stream_base::timeout{
std::chrono::seconds(3), // handshake timeout
std::chrono::seconds(3), // idle timeout
false,
});
asio::connect(ws_->next_layer().next_layer(), results);
if (!SSL_set_tlsext_host_name(ws_->next_layer().native_handle(), host.c_str())) {
throw beast::system_error(
beast::error_code(static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()),
"[WSCO]: Failed to set SNI"
);
}
ws_->next_layer().handshake(asio::ssl::stream_base::client);
ws_->handshake(host + ":" + port, DEFAULT_URI);
std::cout << "[WSCO]: Connected to " << host << std::endl;
authenticate();
do_read();
} catch (const std::exception& e) {
std::cerr << "[WSCO]: Connection failed: " << e.what() << ". Retrying..." << std::endl;
reconnect();
}
}
/**
* Keeps the connection up to date
*/
void WebSocketClientOrder::run() const { io_context_.run(); }
/**
* Sends two orders to the exchange via the WS channel
* @link http://bybit-exchange.github.io/docs/v5/websocket/trade/guideline
*/
void WebSocketClientOrder::place_order([[maybe_unused]] std::string const& symbol_first,
[[maybe_unused]] std::string const& symbol_second,
[[maybe_unused]] std::string const& side_first,
[[maybe_unused]] std::string const& side_second, //
double const qty) {
static std::string const qty_str = std::to_string(qty); // Caching qty in string format
static const std::string recv_window = "8000";
const long timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
const std::string timestamp_str = std::to_string(timestamp);
// todo - call send_async
}
/**
* Performs service data processing before parsing
*/
void WebSocketClientOrder::do_read() {
ws_->async_read(buffer_, [this](beast::error_code ec, size_t bytes_transferred) {
if (ec) {
std::cerr << "[WSCO]: Read error: " << ec.message() << ". Reconnecting..." << std::endl;
reconnect();
return;
}
std::string response = beast::buffers_to_string(buffer_.data()).substr(0, bytes_transferred);
std::cout << "[WSCO]: Server Response: " << response << std::endl;
buffer_.consume(bytes_transferred);
do_read();
});
}
/**
* Reconnects the websocket connection in case of loss.
*/
void WebSocketClientOrder::reconnect() {
close();
std::this_thread::sleep_for(std::chrono::seconds(RECONNECTION_TIMEOUT));
connect(host_, port_);
}
/**
* Closes the SSL session correctly
*/
void WebSocketClientOrder::close() {
try {
if (ws_->is_open()) {
beast::error_code ec;
ws_->close(websocket::close_code::normal, ec);
if (ec && ec != beast::errc::not_connected) {
std::cerr << "[WSCO]: Close failed: " << ec.message() << std::endl;
}
}
beast::error_code ec;
// Checking whether the SSL layer is open before attempting to close
if (ws_->next_layer().lowest_layer().is_open()) {
ws_->next_layer().shutdown(ec);
if (ec && ec != asio::error::eof && ec != asio::ssl::error::stream_truncated) {
std::cerr << "[WSCO]: SSL shutdown failed: " << ec.message() << std::endl;
}
}
// Closing the socket
ws_->next_layer().lowest_layer().close(ec);
if (ec) {
std::cerr << "[WSCO]: Socket close failed: " << ec.message() << std::endl;
}
} catch (const std::exception &e) {
std::cerr << "[WSCO]: Exception during close: " << e.what() << std::endl;
}
ws_.reset();
}
/**
* Sends the message asynchronously, adding it to a queue for sequential sending
*/
void WebSocketClientOrder::send_async(const std::string& message) {
// Lock the mutex to ensure thread-safety when accessing the queue
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);
message_queue_.push(message);
}
// If this is the first message in the queue, start sending
if (message_queue_.size() == 1) {
process_next_message();
}
}
/**
* Processes and sends the next message in the queue
*/
void WebSocketClientOrder::process_next_message() {
std::lock_guard<std::mutex> lock(message_queue_mutex_);
if (!message_queue_.empty()) {
std::string message = message_queue_.front();
ws_->async_write(
asio::buffer(message),
[this, message](beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
if (ec) {
std::cerr << "[WSCO]: Async send failed: " << ec.message() << std::endl;
reconnect();
return;
}
std::cout << "[WSCO]: Async message sent: " << message << std::endl;
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);
message_queue_.pop();
}
process_next_message();
});
}
}
/**
* Generates an hmac signature for the request
*/
std::string WebSocketClientOrder::generate_signature(long expires) {
std::string data = "GET/realtime" + std::to_string(expires);
unsigned char hmac_result[EVP_MAX_MD_SIZE];
unsigned int len = 0;
HMAC(EVP_sha256(), api_secret_.c_str(), api_secret_.length(),
reinterpret_cast<const unsigned char*>(data.c_str()), data.length(), hmac_result, &len);
std::ostringstream oss;
for (unsigned int i = 0; i < len; i++) {
oss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(hmac_result[i]);
}
return oss.str();
}
/**
* Performs authentication on the server
*/
void WebSocketClientOrder::authenticate() {
long expires = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1) + 5000;
std::string signature = generate_signature(expires);
nlohmann::json auth_message = {
{"op", "auth"},
{"args", {api_key_, std::to_string(expires), signature}}
};
send_async(auth_message.dump());
}
int main() {
asio::io_context io_context;
asio::ssl::context ssl_context(asio::ssl::context::tlsv12_client);
WebSocketClientOrder client(io_context, ssl_context, "nPlgUfp7VapVOrZi3e",
"wL9k4of15mXmEY3DALcBUNQGagW3oq8IJXmg");
client.connect("stream.bybit.com", "443");
client.run();
}