Search code examples
c++boostboost-asio

Boost asio. Client udp efficiency


I have implemented a udp session using a multi-threaded environment.

    using RawDataArray=std::array <unsigned char,65000>;
    
    class StaticBuffer
    {
    private:
        RawDataArray                            m_data;
        std::size_t                             m_n_avail;
    public:
    
        StaticBuffer():m_data(),m_n_avail(0){}
        StaticBuffer(std::size_t n_bytes){m_n_avail=n_bytes;}
        StaticBuffer(const StaticBuffer& other)
        {
            std::cout<<"ctor cpy\n";
            m_data=other.m_data;
            m_n_avail=other.m_n_avail;
        }
        StaticBuffer(const StaticBuffer& other,std::size_t n_bytes)
        {
            std::cout<<"ctor cpy\n";
            m_data=other.m_data;
            m_n_avail=n_bytes;
        }
        StaticBuffer(const RawDataArray& data,std::size_t n_bytes)
        {
            std::cout<<"ctor static buff\n";
            m_data=data;
            m_n_avail=n_bytes;
        }
        void set_size(int n)
        {
            m_n_avail=n;
        }
        void set_max_size(){m_n_avail=m_data.size();}
        std::size_t max_size()const {return m_data.size();}
        unsigned char& operator[](std::size_t i){return m_data[i];}
        const unsigned char& operator[] (std::size_t i)const{return m_data[i];}
        StaticBuffer& operator=(const StaticBuffer& other)
        {
            if (this == &other)
                return *this;
            m_data = other.m_data;
            m_n_avail = other.m_n_avail;
            return *this;
        }
        void push_back(unsigned char val)
        {
            if (m_n_avail<m_data.size())
            {
                m_data[m_n_avail]=val;
            }else
                throw "Out of memory";
        }
        void reset(){m_n_avail=0;}
    
        unsigned char* data(){return m_data.data();}
        const unsigned char* data()const {return m_data.data();}
        std::size_t size()const{return m_n_avail;}
    
        ~StaticBuffer(){}
    };
    
        class UDPSeassion;
        using DataBuffer = StaticBuffer;
        using DataBufferPtr=std::unique_ptr<DataBuffer>;
        using ExternalReadHandler=std::function<void(DataBufferPtr)>;
        
        class UDPSeassion:public std::enable_shared_from_this<UDPSeassion>
        {
        private:
            asio::io_context&           m_ctx;
            asio::ip::udp::socket       m_socket;
            asio::ip::udp::endpoint     m_endpoint;
            std::string                 m_addr;
            unsigned short              m_port;
        
            asio::io_context::strand    m_send_strand;
            std::deque<DataBufferPtr>   m_dq_send;
        
        
            asio::io_context::strand    m_rcv_strand;
            DataBufferPtr               m_rcv_data;
        
            ExternalReadHandler         external_rcv_handler;
        
        private:
            void do_send_data_from_dq()
            {
                if (m_dq_send.empty())
                    return;
        
                m_socket.async_send_to(
                            asio::buffer(m_dq_send.front()->data(),m_dq_send.front()->size()),
                            m_endpoint,
                            asio::bind_executor(m_send_strand,[this](const boost::system::error_code& er, std::size_t bytes_transferred){
                    if (!er)
                    {
                        m_dq_send.pop_front();
                        do_send_data_from_dq();
        
                    }else
                    {
                        //post to loggger
                    }
                }));
            }
        
            void do_read(const boost::system::error_code& er, std::size_t bytes_transferred)
            {
                if (!er)
                {
                    m_rcv_data->set_size(bytes_transferred);
                    asio::post(m_ctx,[this,data=std::move(m_rcv_data)]()mutable{ external_rcv_handler(std::move(data));});
                    m_rcv_data=std::make_unique<DataBuffer>();
                    m_rcv_data->set_max_size();
                    async_read();
                }
            }
        
        public:
        
            UDPSeassion(asio::io_context& ctx,const std::string& addr, unsigned short port):
                m_ctx(ctx),
                m_socket(ctx),
                m_endpoint(asio::ip::address::from_string(addr),port),
                m_addr(addr),
                m_port(port),
                m_send_strand(ctx),
                m_dq_send(),
                m_rcv_strand(ctx),
                m_rcv_data(std::make_unique<DataBuffer>(65000))
        
            {}
            ~UDPSeassion(){}
        
            const std::string& get_host()const{return m_addr;}
            unsigned short get_port(){return m_port;}
            template<typename ExternalReadHandlerCallable>
            void set_read_data_headnler(ExternalReadHandlerCallable&& handler)
            {
                external_rcv_handler=std::forward<ExternalReadHandlerCallable>(handler);
            }
            void start()
            {
                m_socket.open(asio::ip::udp::v4());
                async_read();
            }
        
            void async_read()
            {
                m_socket.async_receive_from(
                            asio::buffer(m_rcv_data->data(),m_rcv_data->size()),
                            m_endpoint,
                            asio::bind_executor(m_rcv_strand,std::bind(&UDPSeassion::do_read,this,std::placeholders::_1,std::placeholders::_2) )
                            );
            }
        
            void async_send(DataBufferPtr pData)
            {
                asio::post(m_ctx,
                           asio::bind_executor(m_send_strand,[this,pDt=std::move(pData)]()mutable{
                                                                                                m_dq_send.emplace_back(std::move(pDt));
                                                                                                if (m_dq_send.size()==1)
                                                                                                    do_send_data_from_dq();
                                                                                                }));
            }
        };

void handler_read(DataBufferPtr pdata)
{
    // decoding raw_data -> decod_data
    // lock mutext
    // queue.push_back(decod_data)
    // unlock mutext

    //for view pdata
    std::stringstream ss;
    ss<<"thread handler: "<<std::this_thread::get_id()<<" "<<pdata->data()<<" "<<pdata->size()<<std::endl;
    std::cout<<ss.str()<<std::endl;
}
int main()
{
    asio::io_context ctx;
    //auto work_guard = asio::make_work_guard(ctx);
    std::cout<<"MAIN thread: "<<std::this_thread::get_id()<<std::endl;
    StaticBuffer b{4};
    b[0]='A';
    b[1]='B';
    b[2]='C';
    b[4]='\n';

    UDPSeassion client(ctx,"127.0.0.1",11223);
    client.set_read_data_headnler(handler_read);
    client.start();

    std::vector<std::thread> threads;

    for (int i=0;i<3;++i)
    {
        threads.emplace_back([&](){
            std::stringstream ss;
            ss<<"run thread: "<<std::this_thread::get_id()<<std::endl;
            std::cout<<ss.str();
            ctx.run();
            std::cout<<"end thread\n";
        }
        );
    }

    client.async_send(std::make_unique<StaticBuffer>(b));
    ctx.run();

    for (auto& t:threads)
        t.join();

    return 1;
}

in the code above, the main emphasis is on the UDPSeasion class. Class StaticBuffer is written so that it performs the main functions. I have some questions:

  1. Suppose that this class will be built into a system that works with a frequency of ~ 100 Hz. Every 10ms, the system will send its state through the client. 1.1 Is it properly done for a multi-threaded environment? How efficient is this implementation? 1.2 How efficient is a client implementation that contains only one thread within itself that serves reading and writing? example
  2. Is buffer transfer between tasks correct? (std::move(unique_ptr_data))
  3. In practice, how many threads are given to the client to process reads and writes?
  4. For TCP client?

I will be very grateful for detailed answers to my questions.Thank you very much))


Solution

  • I'd simplify a ton.

    • you "use" enable_shared_from_this but none of the asynchronous operations capture shared_from_this. In fact, you don't even allocate UDPSession shared, so it would be Undefined Behaviour to use shared_from_this at all.

    • no-op destructors are implied. If you must declare them, = default them

    • the m_rcv_strand is deprecated - use strand<> instead

    • why is there a separate strand for send/receive? Sure, 1 read operation is allowed to overlap 1 write operations, but you still cannot access the shared objects (like m_socket) without proper synchronization

    • you have strands but seem to erronously not post to them where relevant (e.g. post(m_ctx, bind_executor(m_send_strand, ....)) is conflicting)

    • you have a laborious buffer type that /seemingly/ aims to avoid allocation, but you wrap it in a unique_ptr anyways ¯\(ツ)

    • set_read_data_handler doesn't need to be a template. Since you're erasing to std::function anyways, there's zero benefit over just using:

       void set_read_data_handler(ExternalReadHandler handler) {
            external_rcv_handler = std::move(handler);
       }
      
    • You have repeated magic constants (e.g. 65000)

    • You seem to be missing a socket bind() call

    In short I'd replace the buffer with something sensible:

    using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
    static_assert(StaticBuffer::static_capacity == 65000);
    

    Since you seem to expect text protocol, chances are your average message is (much) smaller, so I reckon it may be much faster to just use std::string or even boost::container::small_vector<...>.

    Not really required but to allow for elegant, asio-standard use:

    using asio::buffer;
    static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
    static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }
    

    See a much simplified version Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/container/static_vector.hpp>
    #include <deque>
    #include <iomanip>
    #include <iostream>
    #include <list>
    #include <thread>
    
    namespace { // user-friendly logging
        static std::mutex      s_console_mx;
        static std::atomic_int t_id_gen = 0;
        thread_local int       t_id     = ++t_id_gen;
    
        template <typename... T> static inline void trace(T const&... args) {
            std::lock_guard lk(s_console_mx);
            ((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
        }
    } // namespace
    
    namespace asio = boost::asio;
    using asio::ip::udp;
    
    using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
    static_assert(StaticBuffer::static_capacity == 65000);
    
    // not really required but to allow for elegant, asio-standard use:
    using asio::buffer;
    static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
    static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }
    
    using ExternalReadHandler = std::function<void(StaticBuffer&&)>;
    
    class UDPSession {
      private:
        using error_code = boost::system::error_code;
        asio::any_io_executor m_ex;
        std::string           m_addr;
        uint16_t              m_port;
    
        udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
        udp::socket   m_socket{make_strand(m_ex)};
    
        std::deque<StaticBuffer> m_dq_send;
        StaticBuffer             m_rcv_data;
        ExternalReadHandler      external_rcv_handler;
    
      public:
        UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
            : m_ex(ex)
            , m_addr(addr)
            , m_port(port) {}
    
        std::string const& get_host() const { return m_addr; }
        uint16_t           get_port() { return m_port; }
        void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
    
        void start() {
            m_socket.open(udp::v4());
            m_socket.bind(m_endpoint);
            do_read();
        }
    
        void send(StaticBuffer data) {
            asio::post(m_socket.get_executor(), [this, d = std::move(data)]() mutable {
                m_dq_send.emplace_back(std::move(d));
                if (m_dq_send.size() == 1)
                    send_loop();
            });
        }
    
      private:
        void do_read() {
            m_rcv_data.assign(m_rcv_data.static_capacity, '\0');
            m_socket.async_receive_from(
                buffer(m_rcv_data), m_endpoint,
                std::bind(&UDPSession::on_read, this, std::placeholders::_1, std::placeholders::_2));
        }
    
        void on_read(error_code er, size_t bytes_transferred) {
            if (!er) {
                m_rcv_data.resize(bytes_transferred);
                asio::post(m_ex, [this, data = std::move(m_rcv_data)]() mutable {
                    external_rcv_handler(std::move(data));
                });
                do_read();
            }
        }
    
        void send_loop() {
            if (m_dq_send.empty())
                return;
    
            m_socket.async_send_to(buffer(m_dq_send.front()), m_endpoint,
                                   [this](error_code er, size_t /*bytes_transferred*/) {
                                       if (!er) {
                                           m_dq_send.pop_front();
                                           send_loop();
                                       } // else { /* post to loggger */ }
                                   });
        }
    };
    
    void handler_read(StaticBuffer&& pdata) {
        if (!pdata.empty()) {
            std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
            trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
        }
    }
    
    int main() {
        asio::io_context ctx;
        auto work_guard = asio::make_work_guard(ctx);
    
        trace("Main thread");
    
        std::list<std::thread> threads;
    
        for (int i = 0; i < 3; ++i)
            threads.emplace_back([&]() {
                trace("START");
                ctx.run();
                trace("END");
            });
    
        UDPSession client(ctx.get_executor(), "127.0.0.1", 11223);
        client.set_read_data_handler(handler_read);
        client.start();
        client.send({'A', 'B', 'C', '\n'});
    
        work_guard.reset();
    
        for (auto& t : threads)
            t.join();
    }
    

    The live demo on Coliru "eats" words from main.cpp. Here's a local dictionary demo:

    enter image description here

    EXTRA: Thread Pool, shared_from_this

    You might have noticed I changed to any_io_executor instead of io_context&. That way you can easily switch to asio::thread_pool instead of doing it manually (poorly¹).

    Let's also re-instate shared_from_this, but correctly this time.

    For simplicity I've used a static buffer ONLY for the receive buffer (because that's how datagram protocols roll), and just used vector (or small_vector) for the DataBuffer.

    Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <iomanip>
    #include <iostream>
    
    namespace { // user-friendly logging
        static std::mutex      s_console_mx;
        static std::atomic_int t_id_gen = 0;
        thread_local int       t_id     = ++t_id_gen;
    
        template <typename... T> static inline void trace(T const&... args) {
            std::lock_guard lk(s_console_mx);
            ((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
        }
    } // namespace
    
    namespace asio = boost::asio;
    using asio::ip::udp;
    
    //using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
    using DataBuffer = std::vector<uint8_t>;
    using ExternalReadHandler = std::function<void(DataBuffer&&)>;
    
    class UDPSession : public std::enable_shared_from_this<UDPSession> {
      private:
        using error_code = boost::system::error_code;
        asio::any_io_executor m_ex;
        std::string           m_addr;
        uint16_t              m_port;
    
        udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
        udp::socket   m_socket{make_strand(m_ex)};
    
        std::deque<DataBuffer>     m_dq_send;
        std::array<uint8_t, 65000> m_rcv_data;
        ExternalReadHandler        external_rcv_handler;
    
      public:
        UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
            : m_ex(ex)
            , m_addr(addr)
            , m_port(port) {}
    
        std::string const& get_host() const { return m_addr; }
        uint16_t           get_port() { return m_port; }
        void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
    
        void start() {
            m_socket.open(udp::v4());
            m_socket.bind(m_endpoint);
            do_read();
        }
    
        void send(DataBuffer data) {
            asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
                m_dq_send.emplace_back(std::move(d));
                if (m_dq_send.size() == 1)
                    send_loop();
            });
        }
    
      private:
        void do_read() {
            using namespace std::placeholders;
            m_socket.async_receive_from( //
                asio::buffer(m_rcv_data), m_endpoint,
                std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
        }
    
        void on_read(error_code er, size_t bytes_transferred) {
            if (!er) {
                asio::post(
                    m_ex,
                    [this, self=shared_from_this(),
                     data = DataBuffer(m_rcv_data.data(), m_rcv_data.data() + bytes_transferred)]() mutable {
                        external_rcv_handler(std::move(data));
                    });
                do_read();
            }
        }
    
        void send_loop() {
            if (m_dq_send.empty())
                return;
    
            m_socket.async_send_to( //
                asio::buffer(m_dq_send.front()), m_endpoint,
                [this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
                    if (!er) {
                        m_dq_send.pop_front();
                        send_loop();
                    } // else { /* post to loggger */ }
                });
        }
    };
    
    void handler_read(DataBuffer&& pdata) {
        if (!pdata.empty()) {
            std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
            trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
        }
    }
    
    int main() {
        trace("Main thread");
    
        asio::thread_pool ctx(4);
    
        {
            auto client = std::make_shared<UDPSession>(ctx.get_executor(), "127.0.0.1", 11223);
            client->set_read_data_handler(handler_read);
            client->start();
            client->send({'A', 'B', 'C', '\n'});
        } // client stays alive through shared ownership
    
        ctx.join();
    }
    

    As icing on the cake, you can template the entire thing on the concrete executor type and avoid type-erasing the executor type:

    template <typename Executor>
    class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
        using socket_t   = asio::basic_datagram_socket<udp, asio::strand<Executor>>;
    

    See it Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <iomanip>
    #include <iostream>
    
    namespace { // user-friendly logging
        static std::mutex      s_console_mx;
        static std::atomic_int t_id_gen = 0;
        thread_local int       t_id     = ++t_id_gen;
    
        template <typename... T> static inline void trace(T const&... args) {
            std::lock_guard lk(s_console_mx);
            ((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
        }
    } // namespace
    
    namespace asio = boost::asio;
    using asio::ip::udp;
    
    //using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
    using DataBuffer = std::vector<uint8_t>;
    using ExternalReadHandler = std::function<void(DataBuffer&&)>;
    
    template <typename Executor>
    class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
        using socket_t   = asio::basic_datagram_socket<udp, asio::strand<Executor>>;
        using error_code = boost::system::error_code;
        Executor    m_ex;
        std::string m_addr;
        uint16_t    m_port;
    
        udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
        socket_t      m_socket{make_strand(m_ex)};
    
        std::deque<DataBuffer>     m_dq_send;
        std::array<uint8_t, 65000> m_rcv_data;
        ExternalReadHandler        external_rcv_handler;
    
        using std::enable_shared_from_this<UDPSession>::shared_from_this;
      public:
        UDPSession(Executor ex, std::string const& addr, uint16_t port)
            : m_ex(ex)
            , m_addr(addr)
            , m_port(port) {}
    
        std::string const& get_host() const { return m_addr; }
        uint16_t           get_port() { return m_port; }
        void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
    
        void start() {
            m_socket.open(udp::v4());
            m_socket.bind(m_endpoint);
            do_read();
        }
    
        void send(DataBuffer data) {
            asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
                m_dq_send.emplace_back(std::move(d));
                if (m_dq_send.size() == 1)
                    send_loop();
            });
        }
    
      private:
        void do_read() {
            using namespace std::placeholders;
            m_socket.async_receive_from( //
                asio::buffer(m_rcv_data), m_endpoint,
                std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
        }
    
        void on_read(error_code er, size_t bytes_transferred) {
            if (!er) {
                auto f = m_rcv_data.data(), l = f + bytes_transferred;
                asio::post(m_ex, [self = shared_from_this(), data = DataBuffer(f, l)]() mutable {
                    self->external_rcv_handler(std::move(data));
                });
                do_read();
            }
        }
    
        void send_loop() {
            if (m_dq_send.empty())
                return;
    
            m_socket.async_send_to( //
                asio::buffer(m_dq_send.front()), m_endpoint,
                [this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
                    if (!er) {
                        m_dq_send.pop_front();
                        send_loop();
                    } // else { /* post to loggger */ }
                });
        }
    };
    
    void handler_read(DataBuffer&& pdata) {
        if (!pdata.empty()) {
            std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
            trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
        }
    }
    
    int main() {
        trace("Main thread");
    
        using Ex = asio::thread_pool::executor_type;
        asio::thread_pool ctx(4);
    
        {
            auto client = std::make_shared<UDPSession<Ex> >(ctx.get_executor(), "127.0.0.1", 11223);
            client->set_read_data_handler(handler_read);
            client->start();
            client->send({'A', 'B', 'C', '\n'});
        } // client stays alive through shared ownership
    
        ctx.join();
    }
    

    Another local demo:

    enter image description here


    ¹ you needed at least exception handling in the runner threads: Should the exception thrown by boost::asio::io_service::run() be caught?