Search code examples
websocketboostc++17stack-overflowfuture

"Process finished with exit code -1073740791 (0xC0000409)" when use vector of future


Background

As a freshmen to binance-websocket api and std::future using. I write a program to test the time difference between localhost and the binance server, and to get the net delay.

I push my code to my github repository ws_binance_time_diff_delay

I have 2 problems.

Problem 1.

It can not run completely.

It shows:

{"id":1,"status":200,"result":{"serverTime":1718697917878},"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"M
INUTE","intervalNum":1,"limit":6000,"count":3}]}
type of future_example is class std::future<class std::vector<__int64,class std::allocator<__int64> > >
test group num 10; remain num 90
1718697921760; 1718697921956; 1718697918073;
1718697921763; 1718697921957; 1718697918073;
1718697921763; 1718697921957; 1718697918073;
1718697921763; 1718697921957; 1718697918073;

And it is neither responding nor terminated then.

Problem 2.

In the file ws_diff_delay.cpp, on the line 84 and 85. If I use the former, it shows:

{"id":1,"status":200,"result":{"serverTime":1718698030116},"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"M
INUTE","intervalNum":1,"limit":6000,"count":3}]}
type of future_example is class std::future<class std::vector<__int64,class std::allocator<__int64> > >
test group num 10; remain num 90
1718698034095; 1718698034251; 1718698030291;
1718698034095; 1718698034314; 1718698030342;
1718698034095; 1718698034314; 1718698030342;

Process finished with exit code -1073740791 (0xC0000409)

It terminates by itself and shows a message of stackoverflow "Process finished with exit code -1073740791 (0xC0000409)".

I debugged and traced it to line 97.

It stop on the assemble file debug_assamble at line 24

subl   $0xd8, %esp
movq   0x2e3942(%rip), %rax
xorq   %rsp, %rax
movq   %rax, 0xc0(%rsp)
andq   $0x0, 0x28(%rsp)
leaq   -0x26(%rip), %rax          ; RaiseException
andl   $0x1, %edx
movl   %ecx, 0x20(%rsp)
movl   %edx, 0x24(%rsp)
movq   %rax, 0x30(%rsp)
testq  %r9, %r9
je     0x18004468a
movl   $0xf, %eax
leaq   0x40(%rsp), %rcx
cmpl   %eax, %r8d
movq   %r9, %rdx
cmovbel %r8d, %eax
movl   %eax, %r8d
movl   %r8d, 0x38(%rsp)
shlq   $0x3, %r8
callq  0x1800b7c77
leaq   0x20(%rsp), %rcx
callq  *0x1f864c(%rip)
nopl   (%rax,%rax)
movq   0xc0(%rsp), %rcx
xorq   %rsp, %rcx
callq  0x1800af760
addq   $0xd8, %rsp
retq
int3
andl   $0x0, 0x38(%rsp)
jmp    0x180044660
int3
int3
int3
int3
int3
int3
int3
jno    0x18004465c
popq   %rbx

The scene before exception is Scene before exception

What is the difference between line 84 and line 85.

I print the type on line 72 to make sure the type is correct.


  • File ./CMakeLists.txt
     cmake_minimum_required(VERSION 3.24)
     project(ws_binance_time_diff_delay)
     set(CMAKE_CXX_STANDARD 17)
     add_definitions(
             -DBOOST_THREAD_PROVIDES_FUTURE
             -DBOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
             -DBOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
     )
    
     add_executable(ws_binance_time_diff_delay
             ws_diff_delay.cpp
     )
    
     target_include_directories(ws_binance_time_diff_delay PRIVATE ${CMAKE_SOURCE_DIR})
     target_include_directories(ws_binance_time_diff_delay PRIVATE "C:/Users/Mike_Wei/CLionProjects/commonUtils")
    
     find_package(RapidJSON CONFIG REQUIRED)
     target_link_libraries(ws_binance_time_diff_delay PRIVATE rapidjson)
    
     find_package(OpenSSL REQUIRED)
     find_package(ZLIB REQUIRED)
     find_package(Boost REQUIRED COMPONENTS system thread date_time log log_setup program_options)
    
     target_include_directories(ws_binance_time_diff_delay PRIVATE ${Boost_INCLUDE_DIRS})
     target_link_libraries(ws_binance_time_diff_delay PRIVATE ${Boost_LIBRARIES})
    
     find_package(Threads REQUIRED)
     target_include_directories(ws_binance_time_diff_delay PRIVATE ${OPENSSL_INCLUDE_DIR})
     target_link_libraries(ws_binance_time_diff_delay PRIVATE
             Boost::system
             Boost::log
             OpenSSL::SSL
             OpenSSL::Crypto
             Threads::Threads
     )
    
     find_package(jsoncpp CONFIG REQUIRED)
     target_link_libraries(ws_binance_time_diff_delay PRIVATE JsonCpp::JsonCpp)
    
     find_package(fmt CONFIG REQUIRED)
     target_link_libraries(ws_binance_time_diff_delay PRIVATE fmt::fmt)    ```
    
    
  • File ws_diff_delay.cpp
     #include <cstdlib>
     #include <stdint.h>
    
     #include <iostream>
     #include <string>
     #include <optional>
     #include <future>
     #include <chrono>
     #include <functional>
    
     #include <boost/beast/core.hpp>
     #include <boost/beast/websocket.hpp>
     #include <boost/beast/websocket/ssl.hpp>
     #include <boost/asio.hpp>
     #include <boost/asio/ssl.hpp>
     #include <boost/log/trivial.hpp>
    
     #include <rapidjson/document.h>
    
     #include <commonUtils.h>
    
     class mini_ws_client {
         using tcp = boost::asio::ip::tcp;
    
         const std::string host = "testnet.binance.vision";
         const std::string port = "443";
         const std::string target = "/ws-api/v3";
    
         boost::asio::ssl::context ctx{boost::asio::ssl::context::sslv23_client};
         boost::asio::io_context ioc;
         std::shared_ptr<boost::beast::websocket::stream<boost::asio::ssl::stream<tcp::socket> > > ws_sp;
    
     public:
         explicit mini_ws_client(
             const std::string host = "testnet.binance.vision",
             const std::string port = "443",
             const std::string target = "/ws-api/v3"
         ): host(host), port(port), target(target) {
             tcp::resolver resolver(ioc);
             ws_sp = std::make_shared<boost::beast::websocket::stream<boost::asio::ssl::stream<tcp::socket> > >(ioc, ctx);
             if (!SSL_set_tlsext_host_name(ws_sp->next_layer().native_handle(), host.c_str())) {
                 boost::system::error_code ec{static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category()};
                 throw boost::system::system_error{ec};
             }
             auto const results = resolver.resolve(host, port);
             auto ep = boost::asio::connect(ws_sp->next_layer().next_layer(), results);
             ws_sp->next_layer().handshake(boost::asio::ssl::stream_base::client);
             ws_sp->set_option(boost::beast::websocket::stream_base::decorator(
                 [](boost::beast::websocket::request_type &req) {
                     req.set(boost::beast::http::field::user_agent,
                             std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro");
                 }));
             ws_sp->handshake(host, target);
         }
    
         ~mini_ws_client() {
             ws_sp->close(boost::beast::websocket::close_code::normal);
             ws_sp.reset();
         }
    
         std::pair<double, double> get_diff_delay(
             int test_times = 100,
             int one_group_num = 10,
             const std::string msg = "{\"id\":1,\"method\":\"time\"}"
         ) const {
             auto future_example = std::async(
                 std::launch::async,
                 &mini_ws_client::get_one_timestamps,
                 this,
                 msg
             );
             std::cout << "type of future_example is " << typeid(future_example).name() << std::endl;
             std::vector<int64_t> timeStamps{};
             int cnt{test_times};
             while (cnt) {
                 int cur_group_num{one_group_num};
                 if (cnt > one_group_num) {
                     cnt -= one_group_num;
                 } else {
                     cur_group_num = test_times;
                     cnt = 0;
                 }
                 std::cout << "test group num " << cur_group_num << "; " << "remain num " << cnt << std::endl;
                 std::vector<std::future<std::vector<int64_t> > > time_stamps_vec{};
                 // std::vector<decltype(future_example) > time_stamps_vec{};
                 for (int i{0}; i < cur_group_num; ++i) {
                     time_stamps_vec.emplace_back(
                         std::async(
                             std::launch::async,
                             &mini_ws_client::get_one_timestamps,
                             this,
                             msg
                         )
                     );
                 }
                 for (auto &timestamps_future: time_stamps_vec) {
                     auto one_timestamp = timestamps_future.get();
                     printVector(one_timestamp);
                     timeStamps.insert(timeStamps.end(), one_timestamp.begin(), one_timestamp.end());
                 }
             }
             auto diff_delay = calculateTimeDiffDelaySub(timeStamps);
             return diff_delay;
         }
    
         std::vector<int64_t> get_one_timestamps(const std::string &msg) const {
             std::vector<int64_t> timeStamps_one_case{};
             auto presend_time = getTimeStamp();
             auto response_str = request_to_response(msg);
             auto postsend_time = getTimeStamp();
             auto server_time = get_server_time(response_str);
             if (server_time) {
                 timeStamps_one_case.push_back(presend_time);
                 timeStamps_one_case.push_back(postsend_time);
                 timeStamps_one_case.push_back(server_time.value());
             }
             printVector(timeStamps_one_case);
    
             return timeStamps_one_case;
         }
    
         std::pair<double, double> calculateTimeDiffDelaySub(const std::vector<int64_t> &timeStamps) const {
             double diff{0.};
             double delay{0.};
             for (int i{0}; i < timeStamps.size(); i += 3) {
                 diff += (
                     timeStamps[i]
                     + timeStamps[i + 1]
                     - (timeStamps[i + 2] << 1)
                 ) * 0.5;
                 delay += (timeStamps[i + 1] - timeStamps[i]) >> 1;
             }
             diff /= timeStamps.size() / 3;
             delay /= timeStamps.size() / 3;
             auto ret = std::make_pair(diff, delay);
             return ret;
         }
    
    
         int64_t getTimeStamp(int64_t diff = 0LL) const {
             // 获取当前时间点
             auto now = std::chrono::system_clock::now();
    
             // 将时间点转换为毫秒
             auto duration = now.time_since_epoch();
             auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
    
             // 输出 13 位时间戳
             return millis - diff;
         }
    
    
         std::string request_to_response(const std::string &msg) const {
             ws_sp->write(boost::asio::buffer(msg));
             boost::beast::flat_buffer buffer;
             ws_sp->read(buffer);
             const char *bufferData = reinterpret_cast<const char *>(buffer.data().data());
             std::size_t bufferSize = buffer.data().size();
             std::string response(bufferData, bufferSize);
             return response;
         }
    
         std::optional<int64_t> get_server_time(const std::string &msg) const {
             rapidjson::Document msg_doc;
             msg_doc.Parse(msg.c_str());
             if (msg_doc.HasParseError() || !msg_doc.IsObject()) {
                 BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " << "JSON parse error or not an object" <<
          std::endl;
                 return std::nullopt;
             }
             if (!msg_doc.HasMember("result") || !msg_doc["result"].IsObject()) {
                 BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " <<
          "No 'result' field or 'result' is not an object" << std::endl;
                 return std::nullopt;
             }
             const rapidjson::Value &result = msg_doc["result"];
             if (!result.HasMember("serverTime") || !result["serverTime"].IsInt64()) {
                 BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " <<
      "'serverTime' field missing or not an int64"
          << std::endl;
                 return std::nullopt;
             }
             int64_t serverTime = result["serverTime"].GetInt64();
             return serverTime;
         }
     };
    
    
     int main() {
         auto ws_client = std::make_shared<mini_ws_client>("ws-api.binance.com", "443", "/ws-api/v3");
         std::cout << ws_client->request_to_response("{\"id\":1,\"method\":\"time\"}") << std::endl;;
         auto &[diff,delay] = ws_client->get_diff_delay();
         std::cout << "diff = " << diff << std::endl;
         std::cout << "delay = " << delay << std::endl;
     }
    

Solution

  • 0xc0000409 means STATUS_STACK_BUFFER_OVERRUN

    Running your code with sanitizers:

    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address,undefined ")
    

    Quickly diagnoses some issues:

    enter image description here

    Clearly some buffer isn't valid for the duration of the write operation.

    I refactored your code to be readable: https://coliru.stacked-crooked.com/a/079b4b92556cd365

    • use strong typed samples instead of nested vectors:
      // using Sample  = std::array<int64_t, 3>;
      struct Sample { int64_t pre, post, server; };
      using Samples = std::vector<Sample>;
      
    • Now you get much simpler logic:
      std::pair<double, double> calculateTimeDiffDelaySub(Samples const& samples) const {
          double diff{0.};
          double delay{0.};
          for (auto& [pre, post, server] : samples) {
              diff  += (pre + post - (server * 2)) * 0.5;
              delay += (post - pre) / 2.0;
          }
          diff  /= samples.size();
          delay /= samples.size();
          return std::pair(diff, delay);
      }
      

    Regardless, you're creating wild threads that all use the same websocket without any synchronization. That's both bad and useless.

    • it's bad because it invokes Undefined Behavior due to the data races
    • it's useless because on a single connection you cannot send another request before the first one is completed

    Did you mean to repeatedly request on the same thread or did you mean to test with several client connections in parallel?

    Refactored For Parallel Batches

    Where each batch is necessary sequential because to run on a single client:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/ssl.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/websocket/ssl.hpp>
    #include <iostream>
    #include <rapidjson/document.h>
    
    #include <fmt/chrono.h>
    #include <fmt/ranges.h>
    static inline void printVector(auto const& v) { fmt::print("printVector: {}\n", v); }
    
    // using Sample  = std::array<int64_t, 3>;
    struct Sample { int64_t pre, post, server; };
    using Samples = std::vector<Sample>;
    
    using Message = std::shared_ptr<std::string const>;
    static Message default_message() {
        static auto instance = std::make_shared<std::string>(R"({"id":1,"method":"time"})");
        return instance;
    }
    
    namespace asio      = boost::asio;
    namespace ssl       = asio::ssl;
    namespace websocket = boost::beast::websocket;
    using tcp           = asio::ip::tcp;
    using error_code    = boost::system::error_code;
    using namespace std::chrono_literals;
    
    struct Args {
        std::string host   = "testnet.binance.vision";
        std::string port   = "443";
        std::string target = "/ws-api/v3";
    };
    
    class Client {
        using tcp = asio::ip::tcp;
    
        Args const                                  args;
        ssl::context                                ctx{ssl::context::sslv23_client};
        asio::io_context                            ioc;
        websocket::stream<ssl::stream<tcp::socket>> ws{ioc, ctx};
    
      public:
        Client(Args args = {}) : args(std::move(args)) { connect(); }
    
        ~Client() {
            error_code ec;
            ws.close(websocket::close_code::normal, ec);
            if (ec.failed())
                std::cerr << "~Client: " << ec.message() << std::endl;
        }
    
        std::string request(Message msg) {
            ws.write(asio::buffer(*msg));
            std::string response;
            auto buf = asio::dynamic_buffer(response);
            ws.read(buf);
            return response;
        }
    
      private:
        void connect() {
            tcp::resolver resolver(ioc);
            if (!SSL_set_tlsext_host_name(ws.next_layer().native_handle(), args.host.c_str()))
                throw boost::system::system_error{
                    error_code{static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()}};
    
            auto results = resolver.resolve(args.host, args.port);
            asio::connect(ws.next_layer().next_layer(), results);
    
            ws.next_layer().handshake(ssl::stream_base::client);
            ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                req.set(boost::beast::http::field::user_agent,
                        std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro");
            }));
            ws.handshake(args.host, args.target);
        }
    };
    
    namespace Benchmark {
        using clock      = std::chrono::system_clock;
        using time_point = clock::time_point;
        int64_t getTimeStamp() { return clock::now().time_since_epoch() / 1ms; }
    
        std::optional<int64_t> get_server_time(std::string const& msg) {
            rapidjson::Document doc;
            doc.Parse(msg.c_str());
    
            if (!doc.HasParseError() && doc.IsObject() && doc.HasMember("result"))
                if (auto const& result = doc["result"];
                    result.IsObject() && result.HasMember("serverTime") && result["serverTime"].IsInt64())
                    return result["serverTime"].GetInt64();
    
            std::cerr << "Unexpected or invalid response" << std::endl;
            return {};
        }
    
        std::pair<double, double> calculateTimeDiffDelaySub(Samples const& samples) {
            double diff{0.};
            double delay{0.};
            if (!samples.empty()) {
                for (auto& [pre, post, server] : samples) {
                    diff  += (pre + post - (server * 2)) * 0.5;
                    delay += (post - pre) / 2.0;
                }
                diff  /= samples.size();
                delay /= samples.size();
            }
            return std::pair(diff, delay);
        }
    
        Samples perform_batch(Args const& args, unsigned n, Message msg) {
            Samples samples;
            try {
                Client client(args);
                for (Client c(args); n--;) {
                    int64_t     pre  = getTimeStamp();
                    std::string res  = client.request(msg);
                    int64_t     post = getTimeStamp();
    
                    if (std::optional<int64_t> server_time = get_server_time(res))
                        samples.push_back({pre, post, server_time.value()});
                    else
                        throw std::runtime_error("No server time in response");
                }
            } catch (boost::system::system_error const& se) {
                std::cerr << "Error in perform_batch: " << se.code().message() << std::endl;
            }
            return samples;
        }
    
        std::pair<double, double> run(Args const& args, unsigned total = 100, unsigned pergroup = 10,
                                                        Message msg = default_message()) {
            std::vector<std::future<Samples>> futs{};
            for (unsigned remain{total}; unsigned batch = std::min(remain, pergroup); remain -= batch) {
                std::cout << "batch " << batch << "; remain " << remain << std::endl;
    
                for (unsigned i = 0; i < batch; ++i)
                    futs.emplace_back(std::async(std::launch::async, perform_batch, args, batch, msg));
            }
    
            Samples merged;
            for (auto& fut : futs) {
                auto one = fut.get();
                merged.insert(merged.end(), one.begin(), one.end());
            }
            return calculateTimeDiffDelaySub(merged);
        }
    }
    
    int main() {
        Args args {"ws-api.binance.com", "443", "/ws-api/v3"};
        std::cout << Client(args).request(default_message()) << std::endl;
    
        auto const& [diff, delay] = Benchmark::run(args, 20, 4);
        std::cout << std::fixed;
        std::cout << "diff=" << diff << " delay=" << delay << std::endl;
    }
    

    Spurious errors during Client's destructor indicate that we might run into rate limiting. I'll leave that up to you to diagnose:

    enter image description here