Search code examples
c++serverudpboost-asio

Trying to write UDP server class, io_context doesn't block


I try to open a UDP server. A baby example works (I receive what I expect and what wireshark also shows): Baby example:

int main(int argc, char* const argv[])
{
  try 
  {
    boost::asio::io_context io_context;
    boost::asio::ip::udp::endpoint ep(boost::asio::ip::udp::v4(), 60001);
    boost::asio::ip::udp::socket sock(io_context, ep);
    UDPServer server(std::move(sock), callbackUDP);
    io_context.run();
  }
  catch (std::exception& e) 
  {
      std::cerr << e.what() << std::endl;
  }
}

UDPServer.hpp:

#include <boost/asio.hpp>
#include <functional>
#include <vector>
#include <thread>

#define BUFFERSIZE 1501

class UDPServer
{
public:
    explicit UDPServer(boost::asio::ip::udp::socket socket, std::function<void(const std::vector<char>&)> callbackFunction);
    virtual ~UDPServer();
private:
    void read();
    boost::asio::ip::udp::socket socket_;
    boost::asio::ip::udp::endpoint endpoint_;
    std::function<void(const std::vector<char>&)> callbackFunction_;
    char data_[1500 + 1]; // 1500 bytes is safe limit as it is max of ethernet frame, +1 is for \0 terminator
};

UDPServer.cpp:

#include <iostream>
#include "UDPServer.h"

UDPServer::UDPServer(boost::asio::ip::udp::socket socket, std::function<void(const std::vector<char>&)> callbackFunction):
socket_(std::move(socket)),
callbackFunction_(callbackFunction)
{
    read();
}

UDPServer::~UDPServer()
{
}

void UDPServer::read() 
{
    socket_.async_receive_from(boost::asio::buffer(data_, 1500), endpoint_,
        [this](boost::system::error_code ec, std::size_t length) 
        {
            if (ec)
            {
                return;
            }
            data_[length] = '\0';
            if (strcmp(data_, "\n") == 0)
            {
                return;
            }
            std::vector<char> dataVector(data_, data_ + length);
            callbackFunction_(dataVector);
            read();
        }
    );
}

Now what I want to convert this to is a class with as constructor only the port and a callback function (let forget about the latter and just print the message for now, adding the callback is normally no problem).

I tried the following, but it doesn't work:

int main(int argc, char* const argv[])
{
  UDPServer server(60001);
}

UDPServer.h:

#include <boost/asio.hpp>
#include <functional>
#include <vector>
#include <thread>

#define BUFFERSIZE 1501

class UDPServer
{
public:
    explicit UDPServer(uint16_t port);
    virtual ~UDPServer();
private:
    boost::asio::io_context io_context_;
    boost::asio::ip::udp::socket socket_;
    boost::asio::ip::udp::endpoint endpoint_;
    std::array<char, BUFFERSIZE> recv_buffer_;
    std::thread thread_;
    void run();
    void start_receive();
    void handle_reply(const boost::system::error_code& error, std::size_t bytes_transferred);
};

UDPServer.cpp:

#include <iostream>
#include "UDPServer.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <iostream>

UDPServer::UDPServer(uint16_t port):
endpoint_(boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port)),
io_context_(),
socket_(io_context_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port)),
thread_(&UDPServer::run, this)
{
        start_receive();
}

UDPServer::~UDPServer()
{
    io_context_.stop();
    thread_.join();
}

void UDPServer::start_receive()
{
    socket_.async_receive_from(boost::asio::buffer(recv_buffer_), endpoint_,
        boost::bind(&UDPServer::handle_reply, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void UDPServer::handle_reply(const boost::system::error_code& error, std::size_t bytes_transferred)
{
    if (!error)
    {
        try {
            std::string string(recv_buffer_.data(), recv_buffer_.data() + bytes_transferred);
            std::cout << "Message received: " << std::to_string(bytes_transferred) << ", " << string << std::endl;
        }
        catch (std::exception ex) {
            std::cout << "handle_reply: Error parsing incoming message:" << ex.what() << std::endl;
        }
        catch (...) 
        {
            std::cout << "handle_reply: Unknown error while parsing incoming message" << std::endl;
        }
    }
    else
    {
        std::cout << "handle_reply: error: " << error.message() << std::endl;
    }
    start_receive();
}

void UDPServer::run()
{
    try {
        io_context_.run();
    } catch( const std::exception& e ) 
    {
        std::cout << "Server network exception: " << e.what() << std::endl;
    }
    catch(...) 
    {
        std::cout << "Unknown exception in server network thread" << std::endl;
    }
    std::cout << "Server network thread stopped" << std::endl;
};

When running I get "Server network thread stopped". io_context doesn't seem to start and doesn't block. Someone an idea what I do wrong? Thanks a lot!

EDIT tried this after comment, same result (except that message comes after 1 second)

UDPServer::UDPServer(uint16_t port):
endpoint_(boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port)),
io_context_(),
socket_(io_context_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port))
{
        start_receive();
        std::this_thread::sleep_for (std::chrono::seconds(1));
        thread_ = std::thread(&UDPServer::run, this);
}

Solution

  • Your destructor explicitly tells the service to stop:

    UDPServer::~UDPServer() {
        io_context_.stop();
        thread_.join();
    }
    

    That's part of your problem. The other part is as pointed out in the comment: you have a race condition where the thread exits before you even post your first async operation.

    Solve it by adding a work guard:

    boost::asio::io_context io_;
    boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_ {io_.get_executor()};
    

    Now the destructor can be:

    UDPServer::~UDPServer() {
        work_.reset(); // allow service to run out of work
        thread_.join();
    }
    

    Other notes:

    • avoid chaining back to start_receive when there was an error

    • std::to_string was redundant

    • the order of initialization for members is defined by the order of their declaration, not their initializers in the initializer list. Catch these bug sources with -Wall -Wextra -pedantic = handle exceptions in your service thread (see Should the exception thrown by boost::asio::io_service::run() be caught?)

    • I'd suggest std::bind over boost::bind:

      std::bind(&UDPServer::handle_reply, this,
                std::placeholders::_1,
                std::placeholders::_2));
      
    • Or just use a lambda:

       [this](error_code ec, size_t xfer) { handle_reply(ec, xfer); });
      

    LIVE DEMO

    Compiler Explorer

    #include <boost/asio.hpp>
    #include <fstream>
    #include <functional>
    #include <iomanip>
    #include <iostream>
    #include <thread>
    #include <vector>
    
    using boost::asio::ip::udp;
    using boost::system::error_code;
    using boost::asio::io_context;
    
    #define BUFFERSIZE 1501
    
    class UDPServer {
      public:
        explicit UDPServer(uint16_t port);
        virtual ~UDPServer();
    
      private:
        io_context io_;
        boost::asio::executor_work_guard<io_context::executor_type> work_ {io_.get_executor()};
        udp::endpoint endpoint_;
        udp::socket socket_;
        std::array<char, BUFFERSIZE> recv_buffer_;
        std::thread thread_;
    
        void run();
        void start_receive();
        void handle_reply(const error_code& error, size_t transferred);
    };
    
    UDPServer::UDPServer(uint16_t port)
            : endpoint_(udp::endpoint(udp::v4(), port)),
              socket_(io_, endpoint_), 
              thread_(&UDPServer::run, this) {
        start_receive();
    }
    
    UDPServer::~UDPServer() {
        work_.reset(); // allow service to run out of work
        thread_.join();
    }
    
    void UDPServer::start_receive() {
        socket_.async_receive_from(boost::asio::buffer(recv_buffer_), endpoint_,
    #if 0
                std::bind(&UDPServer::handle_reply, this,
                    std::placeholders::_1,
                    std::placeholders::_2));
    #else
                [this](error_code ec, size_t xfer) { handle_reply(ec, xfer); });
    #endif
    }
    
    void UDPServer::handle_reply(const error_code& error, size_t transferred) {
        if (!error) {
            try {
                std::string_view s(recv_buffer_.data(), transferred);
                std::cout << "Message received: " << transferred << ", "
                          << std::quoted(s) << "\n";
            } catch (std::exception const& ex) {
                std::cout << "handle_reply: Error parsing incoming message:"
                          << ex.what() << "\n";
            } catch (...) {
                std::cout
                    << "handle_reply: Unknown error while parsing incoming message\n";
            }
    
            start_receive();
        } else {
            std::cout << "handle_reply: error: " << error.message() << "\n";
        }
    }
    
    void UDPServer::run() {
        while (true) {
            try {
                if (io_.run() == 0u) {
                    break;
                }
            } catch (const std::exception& e) {
                std::cout << "Server network exception: " << e.what() << "\n";
            } catch (...) {
                std::cout << "Unknown exception in server network thread\n";
            }
        }
        std::cout << "Server network thread stopped\n";
    }
    
    int main() {
        std::cout << std::unitbuf;
        UDPServer server(60001);
    }
    

    Testing with random words:

    sort -R /etc/dictionaries-common/words | while read w; do sleep 1; netcat -u localhost 60001 -w 0 <<<"$w"; done
    

    Live output:

    enter image description here