Search code examples
visual-c++udpboost-asiostdvectorvisual-c++-2022

MSVC vector Debug Assertion with Boost.Asio async UDP server


I'm working on a cross-patform UDP async server with Boost.Asio, and got the following assertion error when compiling with MSVC (Visual Studio 2022) :

File: <visual studio path>/include/vector
Line: 54

Expression: can't dereference invalidated vector iterator

When building the same code on Linux with GCC, all is fine.

Here's a minimal example of my server :

#include <chrono>
#include <cstdint>
#include <format>
#include <iostream>
#include <thread>
#include <vector>
#include <sdkddkver.h>
#include <boost/asio.hpp>

namespace asio = boost::asio;
using asio::ip::udp;
using asio::ip::port_type;

class Server {
public:
    asio::io_context context{};
    const port_type port;

private:
    std::array<char, 4'096> receivedData{};
    udp::socket socket;
    std::mutex mutex{};

    std::atomic<bool> isRunning{ true };
    std::thread IO_Thread{
        [this] {
            try {
                while (this->isRunning.load()) {
                    this->context.run();
                }
            } catch (std::exception& error) {
                std::cerr << error.what() << std::endl;
                std::exit(1);
            }
        }
    };

    void startReceiving() {
        static udp::endpoint clientIP{};

        this->socket.async_receive_from(
            asio::buffer(this->receivedData, 4'096),
            clientIP,
            [this] (std::error_code error, std::size_t bytesReceived) {
                if (error) {
                    throw std::runtime_error{ std::format("Error while receiving data ({} byte(s)): {}", bytesReceived, error.message()) };
                }
                std::lock_guard lock{ this->mutex };
                const std::vector<std::uint8_t> buf{ this->receivedData.cbegin(), this->receivedData.cbegin() + bytesReceived };
                if (buf[0] == 0xFF) {
                    this->send(clientIP, { 0xFF });
                }
                this->receivedData.fill(0);
                this->startReceiving();
            }
        );
    }


public:
    Server(port_type port) :
        port{ port },
        socket{ this->context, udp::endpoint{ udp::v4(), port }  }
    {
        this->startReceiving();
    };

    ~Server() {
        this->isRunning.store(false);
        if (this->IO_Thread.joinable()) {
            this->IO_Thread.join();
        }
    }

    void send(const udp::endpoint& clientIP, const std::vector<std::uint8_t>& msg) {
        this->socket.async_send_to(
            asio::buffer(msg),
            clientIP,
            [this, msg] (std::error_code error, std::size_t bytesSent) {
                if (error) {
                    throw std::runtime_error{ std::format("Error while sending {} byte(s) of data ({} byte(s) effectively sent): {}", msg.size(), bytesSent, error.message()) };
                }
            }
        );
    }
};

int main(int argc, char* argv[]) {
    Server server{ 5'000 };
    std::this_thread::sleep_for(std::chrono::seconds(4));
}

When reducing my actual code, I managed to accurately locate the issue :

// this raises an assertion error
const std::vector<std::uint8_t> buf{ this->receivedData.cbegin(), this->receivedData.cbegin() + bytesReceived };
if (buf[0] == 0xFF) {
    this->send(clientIP, { 0xFF });
}

// but this doesn't
if (this->receivedData[0] == 0xFF) {
    this->send(clientIP, { 0xFF });
}

I saw that std::vector isn't thread-safe, but I struggle to understand why it is an issue here, as the vector isn't shared between multiple threads. Also, I don't really understand why I haven't got any issue on Linux.

Here's a CMakeLists.txt extract :

find_package(Boost COMPONENTS asio CONFIG REQUIRED)
if (WIN32)
    target_link_libraries(asio_udp PRIVATE wsock32)
    target_link_libraries(asio_udp PRIVATE Ws2_32)
endif ()
target_link_libraries(asio_udp PRIVATE Boost::asio) # Boost.Asio is statically linked
target_compile_features(asio_udp PRIVATE cxx_std_20)
...
elseif (${CMAKE_CXX_COMPILER_ID} STREQUAL MSVC)
    # Warning C4464 warns about . and .. in #include directive
    target_compile_options(asio_udp PRIVATE /W3 /external:anglebrackets /external:W0 /wd4464)
    set_property(TARGET asio_udp PROPERTY MSVC_RUNTIME_LIBRARY "MultiThreadedDebug")
else ()
...

In case it matters, this is my reduced client as well :

#include <array>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <exception>
#include <format>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <sdkddkver.h>
#include <boost/asio.hpp>

namespace asio = boost::asio;
using asio::ip::udp;
using asio::ip::port_type;

class Client {
public:
    asio::io_context context{};

private:
    std::array<char, 4'096> receivedData{};
    std::mutex mutex{};

    std::thread IO_Thread{
        [this] {
            try {
                while (this->isRunning.load()) {
                    this->context.run();
                }
            } catch (std::exception& error) {
                std::cerr << error.what() << std::endl;
            }
        }
    };
    udp::resolver resolver{ this->context };
    udp::socket socket;
    udp::endpoint endpoint;
    udp::endpoint serverEndpoint;

    std::atomic<bool> isRunning{ true };

    void startReceiving() {
        this->socket.async_receive_from(
            asio::buffer(this->receivedData, 4'095),
            this->serverEndpoint,
            [this] (std::error_code error, std::size_t bytesReceived) {
                if (error) {
                    throw std::runtime_error{ std::format("Error while receiving {} byte(s) of data: {}", bytesReceived, error.message()) };
                }

                this->startReceiving();
            }
        );
    }

public:
    Client(const std::string& serverHostName, port_type serverPort) :
        socket{ this->context },
        endpoint{},
        serverEndpoint{ *this->resolver.resolve(udp::v4(), serverHostName, std::to_string(serverPort)).begin() }
    {
        this->socket.open(udp::v4());
        this->socket.send_to(asio::buffer({ 0xFF }), this->serverEndpoint);
        this->startReceiving();
    }

    ~Client() {
        this->send({ 0x00 }); // disconnect
        this->isRunning.store(false);
        if (this->IO_Thread.joinable()) {
            this->mutex.lock();
            this->context.stop(); // discarding potential pending async tasks
            this->mutex.unlock();
            this->IO_Thread.join();
        }
    }

    void send(const std::vector<std::uint8_t>& str) {
        this->socket.async_receive_from(
            asio::buffer(this->receivedData, 4'095),
            this->serverEndpoint,
            [this] (std::error_code error, std::size_t bytesReceived) {
                if (error) {
                    throw std::runtime_error{ std::format("Error while receiving {} byte(s) of data: {}", bytesReceived, error.message()) };
                }

                this->startReceiving();
            }
        );
    }
};

int main() {
    Client client{ "127.0.0.1", 5'000 };
    std::this_thread::sleep_for(std::chrono::seconds(4));
}

Solution

    1. There is UB here:

          socket.async_send_to(
              asio::buffer(msg), clientIP
      

      msg is a local stack variable. clientIP refers to the same static variable that is also used for the next async_receive_from.

    2. The mutex guarding io_context::stop is useless (io_context is threadsafe).

    3. The mutex around the buffer handling in async_receive_from completion handler is also questionable: there's only one IO thread, and the completion handler always runs from that.

    4. The send function on the Client does... not send.

    5. The receive operation on the Client overwrites the server endpoint.

    6. The following call does NOT do what you think it does:

      asio::buffer({0xFF})
      

      See it for yourself: https://coliru.stacked-crooked.com/a/d73a3dad5a08a31f

      Don't "fix" it using {'\xFF'} - that's still UB (the buffer points to a temporary). Use e.g. asio::buffer("\xFF", 1) instead.

    7. Instead of mutexes, consider a strand - you have a logical strand since only the IO thread drives the program, main is just waiting

    8. For bonus points, don't aggregate threads or contexts at all. Instead just pass an executor, which lets the user decide on strands/threads/who runs the service.

    Adding a minimal amount of logging so we can see the traffic going both ways:

    • File server.cpp

      #include <boost/asio.hpp>
      #include <format>
      #include <iostream>
      #include <fmt/ranges.h>
      #include <boost/lexical_cast.hpp>
      
      namespace asio = boost::asio;
      using asio::ip::udp;
      using asio::ip::port_type;
      
      class Server {
          std::array<char, 4'096> incoming_{}, outgoing_{};
          udp::socket             socket_;
          udp::endpoint           peer_;
      
          void startReceiving() {
              socket_.async_receive_from(asio::buffer(incoming_), peer_, [this](std::error_code error, size_t n) {
                  if (error) {
                      std::cerr << std::format("Error receiving ({} byte(s) effectively received): {}", n,
                                               error.message());
                  } else {
                      std::string_view buf(incoming_.data(), n); // not copying
                      fmt::print("Received {::#02x} from {}\n", std::span(buf),
                                 boost::lexical_cast<std::string>(peer_));
      
                      if (buf.starts_with(0xFF)) // safer than buf[0] == 0xFF
                          send(peer_, {0xFF});
      
                      startReceiving();
                  }
              });
          }
      
        public:
          Server(asio::any_io_executor ex, port_type port) //
              : socket_{ex, udp::endpoint{udp::v4(), port}} {
              startReceiving();
          };
      
          void send(udp::endpoint peer, std::vector<uint8_t> const& msg) {
              assert(msg.size() < outgoing_.size());
              size_t n = std::min(outgoing_.size(), msg.size());
              std::copy_n(msg.begin(), n, outgoing_.begin());
      
              socket_.async_send_to(
                  asio::buffer(outgoing_, n), peer, [n](std::error_code error, size_t sent) {
                      if (error)
                          std::cerr << std::format(
                              "Error while sending {} byte(s) of data ({} byte(s) effectively sent): {}", n,
                              sent, error.message());
                  });
          }
      };
      
      int main() {
          asio::io_context ioc;
      
          Server server{ioc.get_executor(), 5'000};
      
          ioc.run_for(std::chrono::seconds(4));
      }
      
    • File client.cpp

      #include <boost/asio.hpp>
      #include <boost/lexical_cast.hpp>
      #include <fmt/ranges.h>
      #include <format>
      #include <iostream>
      
      namespace asio = boost::asio;
      using asio::ip::port_type;
      using asio::ip::udp;
      
      class Client {
          std::array<char, 4'096> incoming{}, outgoing{};
      
          udp::socket   socket;
          udp::resolver resolver{socket.get_executor()};
          udp::endpoint peer;
          udp::endpoint const server; // not overwriting this
      
          void startReceiving() {
              this->socket.async_receive_from(
                  asio::buffer(incoming), peer, [this](std::error_code error, size_t n) {
                      if (error) {
                          std::cerr << std::format("Error while receiving {} byte(s) of data: {}", n,
                                                   error.message())
                                    << std::endl;
                          // ending the read loop ends the program unless write operation still in flight
                      } else {
                          fmt::print("Received {::#02x} from {}\n", std::span(incoming).subspan(0, n),
                                     boost::lexical_cast<std::string>(peer));
                          startReceiving();
                      }
                  });
          }
      
        public:
          Client(asio::any_io_executor ex, std::string const& serverHostName, port_type serverPort)
              : socket{ex, udp::v4()}
              , server{*resolver.resolve(udp::v4(), serverHostName, std::to_string(serverPort)).begin()} {
              socket.send_to(asio::buffer("\xFF", 1), server);
              startReceiving();
          }
      
          void shutdown() {
              socket.cancel(); // cancel read
              send({0x00});    // disconnect
          }
      
          void send(std::vector<std::uint8_t> const& str) {
              assert(str.size() <= outgoing.size());
              auto n = std::min(str.size(), outgoing.size());
              std::copy_n(str.begin(), n, outgoing.begin());
      
              socket.async_send_to( //
                  asio::buffer(outgoing, n), server, [n](std::error_code error, size_t /*sent*/) {
                      if (error) {
                          std::cerr << std::format("Error sending {} byte(s) of data: {}", n, error.message())
                                    << std::endl;
                      }
                  });
          }
      };
      
      int main() {
          asio::io_context ioc;
      
          Client client{ioc.get_executor(), "127.0.0.1", 5'000};
      
          ioc.run_for(std::chrono::seconds(2));
      
          client.shutdown();
          ioc.run(); // complete shutdown operation
      }