Search code examples
c++boostudpbroadcastasio

boost::asio UDP Broadcast Client Only Receives "fast" Packets


I have written a UDP Broadcast client using boost::asio. It works, but with a caveat. If I send packets very fast (at least one every 100ms or so), it seems to receive all of them. However, if I send only a single packet, it doesn't seem to catch it. I'm using an async receive, so I can't imagine why it is not working. The data itself is fairly small and will always be less than the allocated buffer size. When it does recieve the "fast" packets, they are correct and contain only the data from a single "send". In the debugger, it will properly break once per packet sent.

Header:

class BroadcastClient
    {
    public:
        BroadcastClient();
        std::optional<std::string> poll();

    protected:
        void handle_read(const boost::system::error_code& error, std::size_t bytes_transferred);

    private:
        std::future<void> ioFuture;
        std::vector<uint8_t> buffer;
        std::string result;
        boost::asio::io_service ioService;
        std::unique_ptr<boost::asio::ip::udp::socket> socket;
        uint16_t port{ 8888 };
        boost::asio::ip::udp::endpoint sender_endpoint;
    };

Implementation:

BroadcastClient::BroadcastClient()
{
    this->socket = std::make_unique<boost::asio::ip::udp::socket>(
        this->ioService, boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4::any(), this->port));

    this->socket->set_option(boost::asio::socket_base::broadcast(true));
    this->socket->set_option(boost::asio::socket_base::reuse_address(true));

    this->ioFuture = std::async(std::launch::async, [this] { this->ioService.run(); });
    this->buffer.resize(4096);

    this->socket->async_receive_from(
        boost::asio::buffer(this->buffer, this->buffer.size()), sender_endpoint,
        boost::bind(&BroadcastClient::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void BroadcastClient::handle_read(const boost::system::error_code& error, std::size_t bytes_transferred)
{
    if(!error)
    {
        this->result += std::string(std::begin(buffer), std::begin(buffer) + buffer.size());
        std::fill(std::begin(buffer), std::end(buffer), 0);
        
        this->socket->async_receive_from(
            boost::asio::buffer(this->buffer, this->buffer.size()), sender_endpoint,
            boost::bind(&BroadcastClient::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
    }
}

std::optional<std::string> BroadcastClient::poll()
{
    if(this->result.empty() == false)
    {
        auto copy = this->result;
        this->result.clear();
        return copy;
    }

    return {};
}


Solution

  • I had a long search going, because broadcast UDP can be finicky. Then I spotted your future<void>. Not only would I not trust std::async to do what you expect (it can do pretty much anything), but also, there's a potentially lethal race, and this is 99% certain your issue:

    • you launch the async task - it will start /some time in the future/

    • only then you add the async_receive_from operation. If the task had already started, the queue would have been empty, the run() completes and the future is made ready. Indeed, this is visible when you:

       ioService.run();
       std::clog << "End of run " << std::boolalpha << ioService.stopped() << std::endl;
      

    It was printing

    End of run true
    

    most of the time for me. I suggest using a thread:

    ioThread = std::thread([this] {
        ioService.run();
        std::clog << "End of run " << std::boolalpha << ioService.stopped() << std::endl;
    });
    

    with corresponding join:

    ~BroadcastClient() {
        std::clog << "~BroadcastClient()" << std::endl;
        ioThread.join();
    }
    

    To be complete, also handle exceptions: Should the exception thrown by boost::asio::io_service::run() be caught? or use thread_pool(1) which is nice because it also replaces your io_service.

    Alternatively, use a work guard (io_service::work or make_executor_guard).

    Now, I can't seem to make it miss packets when testing locally.

    More Review

    1. In general you want to know earlier when error conditions arise in your code, so report on error in handle_read, because such a condition leads to the async loop to terminate. See below for more fixed handle_read

    2. The result buffer is not thread safe and you access it from multiple threads¹. That invoked Undefined Behavior. Add synchronization, or use e.g. atomic exchanges.

      ¹ to be sure that the poll happens on the service thread you'd have to post the poll operation to the io_service. That's not possible because the service is private

    3. You use buffer.size() in handle_read but that's hard-coded (4096). You probably wanted bytes_transferred

      result.append(std::begin(buffer), std::begin(buffer) + bytes_transferred);
      

      Also avoids an unnecessary temporary. Also, now you don't have to reset the buffer to zeroes:

      void BroadcastClient::handle_read(const boost::system::error_code& error, std::size_t bytes_transferred) {
          if (!error) {
              std::lock_guard lk(result_mx);
              result.append(std::begin(buffer), std::begin(buffer) + bytes_transferred);
      
              start_read();
          } else {
              std::clog << "handle_read: " << error.message() << std::endl;
          }
      }
      
    4. why is socket dynamically instantiated? In fact, you should initialize it in the constructor initializer list, or since C++11 from the NSMI:

      uint16_t port{ 8888 };
      boost::asio::io_service ioService;
      udp::socket socket { ioService, { {}, port } };
      
    5. There's duplication of the async_receive_from call. This calls for a start_read or similar method. Also, consider using a lambda to reduce the code and not rely on old-fashioned boost::bind:

      void BroadcastClient::start_read() {
          socket.async_receive_from(
              boost::asio::buffer(buffer), sender_endpoint,
              [this](auto ec, size_t xfr) { handle_read(ec, xfr); });
      }
      

    Full Listing

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    #include <iomanip>
    #include <thread>
    #include <mutex>
    using namespace std::chrono_literals;
    
    class BroadcastClient {
        using socket_base = boost::asio::socket_base;
        using udp = boost::asio::ip::udp;
      public:
        BroadcastClient();
    
        ~BroadcastClient() {
            std::clog << "~BroadcastClient()" << std::endl;
            socket.cancel();
            work.reset();
            ioThread.join();
        }
        std::optional<std::string> poll();
    
      protected:
        void start_read();
        void handle_read(const boost::system::error_code& error, std::size_t bytes_transferred);
    
      private:
        uint16_t port{ 8888 };
        boost::asio::io_service ioService;
        boost::asio::executor_work_guard<
            boost::asio::io_service::executor_type> work { ioService.get_executor() };
        udp::socket socket { ioService, { {}, port } };
    
        std::thread ioThread;
        std::string buffer = std::string(4096, '\0');
        std::mutex result_mx;
        std::string result;
        udp::endpoint sender_endpoint;
    };
    
    BroadcastClient::BroadcastClient() {
        socket.set_option(socket_base::broadcast(true));
        socket.set_option(socket_base::reuse_address(true));
    
        ioThread = std::thread([this] {
            ioService.run();
            std::clog << "Service thread, stopped? " << std::boolalpha << ioService.stopped() << std::endl;
        });
    
        start_read(); // actually okay now because of `work` guard
    }
    
    void BroadcastClient::start_read() {
        socket.async_receive_from(
            boost::asio::buffer(buffer), sender_endpoint,
            [this](auto ec, size_t xfr) { handle_read(ec, xfr); });
    }
    
    void BroadcastClient::handle_read(const boost::system::error_code& error, std::size_t bytes_transferred) {
        if (!error) {
            std::lock_guard lk(result_mx);
            result.append(std::begin(buffer), std::begin(buffer) + bytes_transferred);
    
            start_read();
        } else {
            std::clog << "handle_read: " << error.message() << std::endl;
        }
    }
    
    std::optional<std::string> BroadcastClient::poll() {
        std::lock_guard lk(result_mx);
        if (result.empty())
            return std::nullopt;
        else 
            return std::move(result);
    }
    
    constexpr auto now = std::chrono::steady_clock::now;
    
    int main() {
        BroadcastClient bcc;
    
        for (auto start = now(); now() - start < 3s;) {
            if (auto r = bcc.poll())
                std::cout << std::quoted(r.value()) << std::endl;
    
            std::this_thread::sleep_for(100ms);
        }
    } // BroadcastClient destructor safely cancels the work
    

    Tested live with

    g++ -std=c++17 -O2 -Wall -pedantic -pthread main.cpp
    while sleep .05; do echo -n "hello world $RANDOM" | netcat -w 0 -u 127.0.0.1 8888 ; done&
    ./a.out
    kill %1
    

    Prints

    "hello world 18422"
    "hello world 3810"
    "hello world 26191hello world 10419"
    "hello world 23666hello world 18552"
    "hello world 2076"
    "hello world 19871hello world 8978"
    "hello world 1836"
    "hello world 11134hello world 16603"
    "hello world 3748hello world 8089"
    "hello world 27946"
    "hello world 14834hello world 15274"
    "hello world 26555hello world 6695"
    "hello world 32419"
    "hello world 26996hello world 26796"
    "hello world 9882"
    "hello world 680hello world 29358"
    "hello world 9723hello world 31163"
    "hello world 3646"
    "hello world 10602hello world 22562"
    "hello world 18394hello world 17229"
    "hello world 20028"
    "hello world 14444hello world 3890"
    "hello world 16258"
    "hello world 28555hello world 21184"
    "hello world 31342hello world 30891"
    "hello world 3088"
    "hello world 1051hello world 5638"
    "hello world 24308hello world 7748"
    "hello world 18398"
    ~BroadcastClient()
    handle_read: Operation canceled
    Service thread, stopped? true
    

    Old answer contents which may /still/ be of interest

    Wait. I noticed this is not "regular" peer-to-peer UDP.

    As far as I understand, multicast works courtesy of routers. They have to maintain complex tables of endpoints "subscribed" so they know where to forward the actual packets.

    Many routers struggle with these, there are builtin pitfalls with the reliability, especially on WiFi etc. It would /not/ surprise me if you had a router (or rather a topology that includes the router) that is struggling with this too and just stops "remembering" the participating endpoints in a multicast group at some time interval.

    I think tables of this type have to be kept in every hop on the route (including the kernel which may have to keep track of several processes for the same multicast group).

    Some hints about this:

    One oft heard piece of advice is:

    • if you can, use multicast for dicscovery, switch to unicast after
    • try to be specific about the bound interface (in your code you might want to replace address_v4::any() with something like lo (127.0.0.1) or whatever ip address identifies your NIC.