Search code examples
c++restboostboost-beast

Boost Beast, Handshake, Keep-Alive


I am trying to setup a service for interrogating a distant database based on Boost Beast. Even if the context and the issues are totally different, I have found this question on SO: HTTP Delay from AWS Instance to Bitmex with Boost Beast and Ubuntu 18 and I have tried to build a service from that example implementation. However, in that example, the requests are sent within the handshake function

void
REST_on_handshake(beast::error_code ec)

and I need to be able to send requests after the handshake has been done (with a connection still alive). So, if I naively empty the body of REST_on_handshake, then when I send the requests from another function I get the error (which I suppose is expected behavior):

terminate called after throwing an instance of 'boost::wrapexcept<boost::system::system_error>'
what():  end of stream

A simple fix is to send an empty request to the server right after the function

rest_ioc.run();

then send new requests from a function; this seems to work, but the undesirable effect is that at every new request, that initial empty request is resent just before the legitimate new request. I have no idea why.

So, what is the correct way to send requests outside the function REST_on_handshake ?

EDIT: Below is the original code from the example mentioned above, with the following modifications:

  1. The function write_after_handshake sends an incomplete request to establish and maintain connection, as REST_on_handshake is empty.

  2. REST_write_limit_order_bulk is (public and) outside REST_on_handshake as obviously in my own application I need to send requests after the initial handshake.

     // g++ -std=c++17 -pthread -o http_test.out http_test.cpp -lssl -lcrypto && ./http_test.out
    
     //Boost & Beast headers
     #include <boost/bind.hpp>
     #include <boost/beast/core.hpp>
     #include <boost/beast/core.hpp>
     #include <boost/beast/http.hpp>
     #include <boost/beast/ssl.hpp>
     #include <boost/beast/version.hpp>
     #include <boost/beast/websocket.hpp>
     #include <boost/beast/websocket/ssl.hpp>
     #include <boost/asio/strand.hpp>
     #include <boost/asio/connect.hpp>
     #include <boost/asio/ip/tcp.hpp>
     #include <boost/asio/ssl/stream.hpp>
     #include <boost/optional.hpp>
    
     #include <thread>
    
     //REST headers
     #include <sstream>
     #include <openssl/evp.h>
     #include <openssl/hmac.h>
    
     //Misc. headers
     #include <iomanip>
     #include <iostream>
     #include <string>
    
     namespace beast     = boost::beast;         // from <boost/beast.hpp>
     namespace http      = beast::http;          // from <boost/beast/http.hpp>
     namespace websocket = beast::websocket;     // from <boost/beast/websocket.hpp>
     namespace net       = boost::asio;          // from <boost/asio.hpp>
     namespace ssl       = boost::asio::ssl;     // from <boost/asio/ssl.hpp>
     using     tcp       = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
    
     using namespace std;
    
    
     class BitMEX_MM : public std::enable_shared_from_this<BitMEX_MM> {
    
         int n_tests = 1;
    
         //REST
         tcp::resolver rest_resolver;
         beast::ssl_stream<beast::tcp_stream> rest_stream;
         beast::flat_buffer rest_buffer;
    
         http::request<http::string_body>  post_req;
         http::response<http::string_body> post_res;
    
         string limit_order_msg;
    
         // Timing
         struct timespec start, end;
    
         //MEMBER VARIABLES
         string apiKey    = ""; //FILL IN API KEY
         string apiSecret = ""; //FILL IN API SEC
         int    apiKeyLen = apiKey.length();
         const char* apiKeyCStr = apiKey.c_str();
         int    apiSecLen = apiSecret.length();
         const char* apiSecCStr = apiSecret.c_str();
         int    expiry_t  = 5;
    
         //REST FUNCTIONS
         static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp)
         {
             ((string*)userp)->append((char*)contents, size * nmemb);
             return size * nmemb;
         }
    
         string HMAC_SHA256_hex_POST(string valid_till)
         {
             string data = "POST/api/v1/order" + valid_till + limit_order_msg;
    
             stringstream ss;
             unsigned int len;
             unsigned char out[EVP_MAX_MD_SIZE];
             HMAC_CTX *ctx = HMAC_CTX_new();
             HMAC_Init_ex(ctx, apiSecCStr, apiSecLen, EVP_sha256(), NULL);
             HMAC_Update(ctx, (unsigned char*)data.c_str(), data.length());
             HMAC_Final(ctx, out, &len);
             HMAC_CTX_free(ctx);
    
             for (int i = 0; i < len; ++i)
             {
                 ss << std::setw(2) << std::setfill('0') << hex << (unsigned int)out[i];
             }
             return ss.str();
         }
    
         void
         REST_on_resolve(
             beast::error_code ec,
             tcp::resolver::results_type results)
         {
             // Make the connection on the IP address we get from a lookup
             beast::get_lowest_layer(rest_stream).async_connect(
                 results,
                 beast::bind_front_handler(
                     &BitMEX_MM::REST_on_connect,
                     shared_from_this()));
         }
    
         void
         REST_on_connect(beast::error_code ec,
                         tcp::resolver::results_type::endpoint_type)
         {       
             // Perform the SSL handshake
             rest_stream.async_handshake(
                 ssl::stream_base::client,
                 beast::bind_front_handler(
                     &BitMEX_MM::REST_on_handshake,
                     shared_from_this()));
         }
    
         void
         REST_on_handshake(beast::error_code ec)
         {       
     /*        limit_order_msg += "{\"symbol\":\"XBTUSD\",\"ordType\":\"Limit\",\"execInst\":\"ParticipateDoNotInitiate\",\"clOrdID\":\"" + to_string(n_tests) \
                             + "\",\"side\":\"Buy\",\"price\":10.0" \
                             + ",\"orderQty\":2}]}";
             REST_write_limit_order_bulk();*/
         }
    
     public:
    
         explicit
         BitMEX_MM(net::io_context& rest_ioc, ssl::context& rest_ctx)
             : rest_resolver(net::make_strand(rest_ioc)),
             rest_stream(net::make_strand(rest_ioc), rest_ctx)
         { }
    
         void
         run_REST_service()
         {           
             // Set SNI Hostname (many hosts need this to handshake successfully)
             if(! SSL_set_tlsext_host_name(rest_stream.native_handle(), "www.bitmex.com"))
             {
                 beast::error_code ec{static_cast<int>(::ERR_get_error()), net::error::get_ssl_category()};
                 std::cerr << "ssl err " << ec.message() << "\n";
                 return;
             }
    
             // Set up an HTTP GET request message
             post_req.version(11);
             post_req.method(http::verb::post);
             post_req.target("/api/v1/order");
             post_req.set(http::field::host, "www.bitmex.com");
             post_req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
             post_req.set(http::field::accept, "*/*");
             post_req.set(http::field::content_type, "application/json");
             post_req.set(http::field::connection, "Keep-Alive");
             post_req.set("api-key", apiKey);
             post_req.insert("Content-Length", "");
             post_req.insert("api-expires", "");
             post_req.insert("api-signature", "");
    
             // Look up the domain name
             rest_resolver.async_resolve(
                 "www.bitmex.com",
                 "443",
                 beast::bind_front_handler(
                     &BitMEX_MM::REST_on_resolve,
                     shared_from_this()));
    
         }
    
         void write_after_handshake()
         {
             limit_order_msg = "";  // Empty message to establish and maintain connection
    
             int valid_till        = time(0) + 5;
             string valid_till_str = to_string(valid_till);
    
             post_req.set("api-expires", valid_till_str);
             post_req.set("api-signature", HMAC_SHA256_hex_POST(valid_till_str));
             post_req.set("Content-Length", to_string(limit_order_msg.length()));
             post_req.body() = limit_order_msg;        
    
             beast::error_code _ec;
             std::size_t       _bt;
    
             http::write(rest_stream, post_req);
             http::read(rest_stream, rest_buffer, post_res);
    
             cout << "request (initial): \n" << post_req << endl;
             cout << "response (initial): \n" << post_res << endl;        
         }
    
         void REST_write_limit_order_bulk()
         {
             int valid_till        = time(0) + 5;
             string valid_till_str = to_string(valid_till);
    
             post_req.set("api-expires", valid_till_str);
             post_req.set("api-signature", HMAC_SHA256_hex_POST(valid_till_str));
             post_req.set("Content-Length", to_string(limit_order_msg.length()));
             post_req.body() = limit_order_msg;
    
             clock_gettime(CLOCK_MONOTONIC, &start);
    
             http::write(rest_stream, post_req);
             http::read(rest_stream, rest_buffer, post_res);
    
             cout << "request: \n" << post_req << endl;
             cout << "response: \n" << post_res << endl;
    
             beast::error_code _ec;
             std::size_t       _bt;
             process_limit_order_bulk_res(_ec, _bt);
         }
    
         void process_limit_order_bulk_res(beast::error_code ec,
                                           std::size_t bytes_transferred)
         {
             clock_gettime(CLOCK_MONOTONIC, &end);
             double time_taken;
             time_taken = (end.tv_sec  - start.tv_sec) + ((end.tv_nsec - start.tv_nsec) * 1e-9);
             cout << "response time: " << time_taken << endl;
    
             ++n_tests;
    
             if (n_tests <= 5)
             {
                 limit_order_msg = "{\"symbol\":\"XBTUSD\",\"ordType\":\"Limit\",\"execInst\":\"ParticipateDoNotInitiate\",\"side\":\"Buy\",\"price\":10.0,\"orderQty\":1.0}";
                 REST_write_limit_order_bulk();
             }
         }
     };
    
    
     int main(int argc, char** argv)
     {
    
         net::io_context rest_ioc;
         ssl::context    rest_ctx{ssl::context::tlsv12_client};
    
         auto algo = make_shared<BitMEX_MM>(rest_ioc, rest_ctx);
    
         cout << "Running http test." << endl;
    
         algo->run_REST_service();
    
         rest_ioc.run();
    
         algo->write_after_handshake();
    
         std::this_thread::sleep_for(std::chrono::milliseconds(30 * 1000));
    
         algo->REST_write_limit_order_bulk();  // Requests sent outside handshake
    
         return 0;
     }
    

The initial request sent in write_after_handshake is resent every time REST_write_limit_order_bulk is called.

EDIT 2:

Thanks for the time and effort @sehe. However, I am still missing some points:

  1. The only use of write_after_handshake() is to send a first (dummy) request, that should not perform anything but to maintain the connection alive for the following actual requests (if I comment it I get an error: terminate called after throwing an instance of 'boost::wrapexceptboost::system::system_error' what(): end of stream) as stated above). In the context here, that would be a request with an empty message, or 0 quantity, which returns an error (as expected) from the server. Is there a cleaner way to do that, in the sense a request for just establishing and maintaining the connection ?

  2. If I use write_after_handshake(), then all the following requests are correctly sent, but you can see with the calls to cout that the response for the first request in write_after_handshake is always displayed:

     {"error":{"message":"Invalid orderQty","name":"ValidationError"}}{"orderID":"... OK here...}
    

and so I understand that the first request is always resent ? This is the main reason of my question, how to avoid that ?

It seems to be the expected behavior: the cache is not cleared. Should I manage something myself here (is a cache too large a problem over time) ? Anyway, to display only the last response this seems to work:

response_ = {};
  1. Beyond implementation issues, my initial goal was just the following: a) Establish and keep alive a connection to a distant server, b) Inside an infinite loop, when some event occurs, like a user asking for data, then form and send the request to the exchange, with the minimal delay possible (as the connection would be alive). The 30 sec. sleep delay is used to simulate a delay between establishing the connection and waiting to send an actual request for a user.

Solution

  • The biggest issue looks to be that you're mixing async and sync IO. From the fact that you're trying to "do stuff" after the rest_ioc.run() I'm concluding that you don't really know how the async code works, and probably don't need it anyways.

    Therefore, I'd rewrite to be synchronous.

    // Look up the domain name
    tcp::resolver resolver(stream.get_executor());
    beast::get_lowest_layer(stream)
        .connect(resolver.resolve("www.bitmex.com", "443"));
    // Perform the SSL handshake
    stream.handshake(ssl::stream_base::client);
    
    write_after_handshake();
    std::this_thread::sleep_for(30'000ms); // or just 30s...
    while (n_tests++ < 5) {
        write_limit_order_bulk();
    }
    

    In reply to

    1. The function write_after_handshake sends an incomplete request to establish and maintain connection, as REST_on_handshake is empty.

    That's inaccurate. It doesn't send a request, not even an incomplete one. It just connects the SSL connection.

    but the undesirable effect is that at every new request, that initial empty request is resent just before the legitimate new request. I have no idea why.

    That's exactly what you write in write_after_handshake()...

    limit_order_msg =
        ""; // Empty message to establish and maintain connection
    

    That clears the request. If you don't want that, don't do that.

    Working back from the remark:

    The initial request sent in write_after_handshake is resent every time REST_write_limit_order_bulk is called.

    I can assume that you want the initial message to actually be {"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","side":"Buy","price":10.0,"orderQty":1.0} like in process_limit_order_bulkd_res. So let me fix it:

    limit_order_msg =
        R"({"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","side":"Buy","price":10.0,"orderQty":1.0})";
    

    Complete Example

    Many style fixes, C++ over C improvements etc. included.

    Live On Compiler Explorer

    // Boost & Beast headers
    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/ssl.hpp>
    #include <boost/beast/version.hpp>
    
     // REST headers
    #include <iomanip>
    #include <iostream>
    #include <openssl/evp.h>
    #include <openssl/hmac.h>
    #include <sstream>
    
    namespace beast = boost::beast;     // from <boost/beast.hpp>
    namespace http  = beast::http;      // from <boost/beast/http.hpp>
    namespace net   = boost::asio;      // from <boost/asio.hpp>
    namespace ssl   = boost::asio::ssl; // from <boost/asio/ssl.hpp>
    
    using namespace std::chrono_literals;
    using tcp = boost::asio::ip::tcp;
    using beast::error_code;
    
    struct BitMEX_MM {
      private:
        beast::ssl_stream<beast::tcp_stream> stream_;
    
        beast::flat_buffer                buffer_;
        http::request<http::string_body>  request_;
        http::response<http::string_body> response_;
    
        std::string const apiKey_    = ""; // FILL IN API KEY
        std::string const apiSecret_ = ""; // FILL IN API SEC
        int const         expiry_t_  = 5;
        int               n_tests_   = 0;
    
        void sign_request() {
            std::ostringstream buf;
            buf << request_.method() << request_.target()
                << request_.at("api-expires") << request_.body();
            auto const data = buf.str();
    
            std::cout << "DEBUG: " << data << "\n";
    
            ::HMAC_CTX* ctx = HMAC_CTX_new();
            ::HMAC_Init_ex(ctx, apiSecret_.data(), apiSecret_.length(),
                           EVP_sha256(), nullptr);
            ::HMAC_Update(ctx, (unsigned char*)data.c_str(), data.length());
    
            unsigned int  len = 0;
            unsigned char out[EVP_MAX_MD_SIZE]{};
            ::HMAC_Final(ctx, out, &len);
            ::HMAC_CTX_free(ctx);
    
            std::stringstream signature;
            for (unsigned i = 0; i < len; ++i)
                signature << std::setw(2) << std::setfill('0') << std::hex << (unsigned int)out[i];
            request_.set("api-signature", signature.str());
        }
    
        void perform_request(std::string const& request_body) {
            request_.set("api-expires", std::to_string(time(0) + expiry_t_));
            request_.body() = request_body;
            request_.prepare_payload();
            sign_request();
    
            auto start = std::chrono::steady_clock::now();
    
            http::write(stream_, request_);
            http::read(stream_, buffer_, response_);
    
            double time_taken = (std::chrono::steady_clock::now() - start)/1.0s;
    
            std::cout << " ------- request: \n" << request_ << std::endl;
            std::cout << " ------- response: \n" << response_ << std::endl;
            std::cout << " ------- response time: " << time_taken << std::endl;
        }
    
        void write_after_handshake() {
            perform_request(R"({"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","side":"Buy","price":10.0,"orderQty":1.0})");
        }
    
        void write_limit_order_bulk() {
            perform_request(
                R"({"symbol":"XBTUSD","ordType":"Limit","execInst":"ParticipateDoNotInitiate","clOrdID":")" +
                std::to_string(n_tests_) +
                R"(","side":"Buy","price":10.0,"orderQty":2}]})");
        }
      public:
        explicit BitMEX_MM(net::io_context& ioc, ssl::context& ssl_ctx)
            : stream_(make_strand(ioc.get_executor()),
                      ssl_ctx) // NOTE: strand not really required for sync
        {}
    
        void run() {
            // Set SNI Hostname (many hosts need this to handshake successfully)
            if (!::SSL_set_tlsext_host_name(stream_.native_handle(),
                                            "www.bitmex.com")) {
                throw boost::system::system_error(
                    static_cast<int>(::ERR_get_error()),
                    net::error::get_ssl_category());
            }
    
            // Set up an HTTP POST request message
            request_.version(11);
            request_.method(http::verb::post);
            request_.target("/api/v1/order");
            request_.set(http::field::host, "www.bitmex.com");
            request_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
            request_.set(http::field::accept, "*/*");
            request_.set(http::field::content_type, "application/json");
            request_.set(http::field::connection, "Keep-Alive");
            request_.set("api-key", apiKey_);
            request_.insert("api-expires", "");
            request_.insert("api-signature", "");
    
            // Look up the domain name
            tcp::resolver resolver(stream_.get_executor());
            beast::get_lowest_layer(stream_)
                .connect(resolver.resolve("www.bitmex.com", "443"));
            // Perform the SSL handshake
            stream_.handshake(ssl::stream_base::client);
    
            write_after_handshake();
            std::this_thread::sleep_for(30'000ms); // or just 30s...
            while (n_tests_++ < 5) {
                write_limit_order_bulk();
            }
        }
    };
    
    int main() {
        try {
            net::io_context ioc;
            ssl::context    ctx{ssl::context::tlsv12_client};
    
            BitMEX_MM algo(ioc, ctx);
    
            std::cout << "Running http test." << std::endl;
            algo.run();
    
        } catch (boost::system::system_error const& se) {
            error_code ec = se.code();
            std::cerr << "Error: " << se.code().message();
            if (ec.has_location())
                std::cerr << " (from " << se.code().location() << ")";
            std::cerr << std::endl;
        }
    }
    

    Which on my system shows (obviously failing because of missing API key & secrets): enter image description here