Search code examples
c++boostboost-asio

How to use boost resolver with async connect?


I'm trying to add resolver to my async UDP socket connection class. I use connect/send interface and I want to resolve and connect in connect, and send data in send. I expect that resolve async in callback give me a range of endpoints and I should iterate over it to find available endpoint. So I implemented connect method, that checks all endpoints, but it looks monstrous and unjustifiably complex.

void UdpConnection::Connect()
{
    std::promise<void> connectPromise;
    auto connectFuture = connectPromise.get_future();

    auto handleResolveCallback = [this, p = std::move(connectPromise), self = shared_from_this()]
        (const boost::system::error_code& ec, boost::asio::ip::udp::resolver::iterator endpoint_iterator) mutable 
            {
                if (ec)
                {
                    p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                }
                else
                {
                    while (endpoint_iterator != boost::asio::ip::udp::resolver::iterator())
                    {
                        try {
                            std::promise<void> connectPromise2;
                            auto connectFuture2 = connectPromise2.get_future();

                            boost::asio::ip::udp::endpoint iter = *endpoint_iterator;
                            boost::asio::post(m_socket.get_executor(),
                                [this, p = std::move(connectPromise2), self = shared_from_this(), iter]() mutable { DoConnect(std::move(p), std::move(iter)); });

                            connectFuture2.get();
                            break;
                        }
                        catch (const std::exception& ex)
                        {
                            std::cout<< "Run-time exception: " << ex.what() << std::endl;
                            ++endpoint_iterator;
                        }
                    };
                }
            };
    
    m_resolver.async_resolve(boost::asio::ip::udp::v4(), m_to.c_str(), m_port.c_str(), std::move(handleResolveCallback));
    connectFuture.get();
}
void UdpConnection::DoConnect(std::promise<void> p, boost::asio::ip::udp::endpoint iter)
{
    auto handleConnect = [p = std::move(p), self = shared_from_this()](const boost::system::error_code& ec) mutable
    {
        EKA_TRACE_DEBUG_EX(self->GetTracer()) << TR << "Connect: " << ec.message();

        if (ec)
            p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
        else {
            p.set_value();
        }
    };

    m_socket.async_connect(iter, std::move(handleConnect));
}

And I don't know why it doesn't call DoConnect, it stucks in connectFuture.get(). In traces:

@asio|1696946039.156806|0*1|[email protected](blk=never,rel=fork)
@asio|1696946039.156806|>1|
@asio|1696946039.157804|>0|
@asio|1696946039.874930|0*2|[email protected](blk=never,rel=fork)

What is the correct method to resolve and async connect?

Full example http://coliru.stacked-crooked.com/a/737a0927ecd74458

P.S. I know that UDP con't use connection, however TCP and UDP have similar interface in boost, so I think my problem is not in protocol.


Solution

  • You're posting the work, then immediately blocking on it - this is by definition useless. You could always just call the function you posted directly instead.

    And this highlights the problem: you block on the service thread, thus ensuring that no handlers can run. Which means, the DoConnect task will never run.

    Instead, organize the async call chain such that nothing ever blocks inside a (completion) handler. E.g. how I did it before with the related TCP question:

    Coliru

    #define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
    #include <boost/asio.hpp>
    #include <boost/asio/ssl.hpp>
    #include <deque>
    #include <iomanip>
    #include <iostream>
    namespace asio = boost::asio;
    namespace ssl  = asio::ssl;
    using asio::ip::tcp;
    using boost::system::error_code;
    using namespace std::chrono_literals;
    using namespace std::placeholders;
    
    namespace protocoller::syslog {
        static inline std::ostream& EKA_TRACE_ERROR() {
            thread_local std::ostream instance(std::cout.rdbuf());
            return instance;
        }
    
        struct IConnection {
            virtual ~IConnection()                          = default;
            virtual void Connect()                          = 0;
            virtual void SendMessageAsync(std::string_view) = 0;
            virtual void Close()                            = 0;
        };
    
        struct TcpConnection
            : IConnection
            , std::enable_shared_from_this<TcpConnection> //
        {
            TcpConnection(asio::any_io_executor ex, tcp::endpoint ep, ssl::context& ctx)
                : m_socket(make_strand(ex), ctx)
                , m_ep(ep) {}
    
            void Connect() override {
                std::promise<void> p;
                auto f = p.get_future();
                asio::post(m_socket.get_executor(),
                           [this, p = std::move(p), self = shared_from_this()]() mutable { //
                               do_connect(std::move(p));
                           });
                f.get();
            }
    
            void Close() override {
                asio::post(m_socket.get_executor(), [this, self = shared_from_this()] { do_close(); });
            }
    
            void SendMessageAsync(std::string_view message) override {
                asio::post(m_socket.get_executor(),
                           [this, self = shared_from_this(), msg = std::string(message)]() mutable {
                               outbox_.push_back(std::move(msg));
                               if (outbox_.size() == 1)
                                   do_send_loop();
                           });
            }
    
          private:
            void do_connect(std::promise<void> p) {
                EKA_TRACE_ERROR() << "Connecting to: " << m_ep << std::endl;
                m_socket.next_layer().async_connect(
                    m_ep, [this, p = std::move(p), self = shared_from_this()](error_code ec) mutable {
                        EKA_TRACE_ERROR() << "Connect: " << ec.message() << std::endl;
                        if (ec)
                            p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                        else {
                            m_socket.async_handshake( //
                                Stream::client, [p = std::move(p), self](error_code ec) mutable {
                                    EKA_TRACE_ERROR() << "Handshake: " << ec.message() << std::endl;
                                    if (ec)
                                        p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                                    else
                                        p.set_value();
                                });
                        }
                    });
            }
    
            void do_send_loop() {
                if (outbox_.empty())
                    return;
                EKA_TRACE_ERROR() << "SendLoop: " << quoted(outbox_.front()) << std::endl;
                outbox_.front() += "\n";
                asio::async_write( //
                    m_socket, asio::buffer(outbox_.front()),
                    [this, self = shared_from_this()](error_code ec, size_t /*bytes_transferred*/) {
                        EKA_TRACE_ERROR() << "HandleWritten: " << ec.message() << std::endl;
                        if (!ec) {
                            outbox_.pop_front();
                            do_send_loop();
                        }
                    });
            }
    
            void do_close() {
                EKA_TRACE_ERROR() << "Closing connection" << std::endl;
                m_socket.async_shutdown([self = shared_from_this()](error_code ec) {
                    EKA_TRACE_ERROR() << "Shutdown: " << ec.message() << std::endl;
                });
            }
    
          private:
            using Stream = ssl::stream<tcp::socket>;
            std::deque<std::string> outbox_;
            Stream                  m_socket;
            tcp::endpoint           m_ep;
        };
    } // namespace protocoller::syslog
    
    int main(int argc, char** argv) {
        asio::thread_pool ioc{1};
        ssl::context ctx{ssl::context::tls_client};
        ctx.set_default_verify_paths();
    
        using namespace protocoller::syslog;
        std::shared_ptr<IConnection> conn =
            std::make_shared<TcpConnection>(ioc.get_executor(), tcp::endpoint{{}, 12514}, ctx);
    
        conn->Connect();
    
        std::vector<std::string> words(argv+1, argv+argc);
        words.push_back("bye world");
        for (auto w : words)
            conn->SendMessageAsync("syslog tls client: " + w);
    
    
        std::this_thread::sleep_for(5000ms);
        conn->Close();
    
        ioc.join();
    }
    

    Update: Review

    Reading your full example there are... many problems.

    • your abstract base class lacks a virtual dtor

    • you are using different strands for things that should be on the same strand, fix:

           , m_socket(make_strand(ex), udp::v4())
           , m_resolver(m_socket.get_executor()) {}
      
    • you are mixing boost::enable_shared_from_this with std::shared_ptr

    • In your Connect you immediately invoke async_resolve, the idea is to post that to the strand

    • The completion handler(s) will actually be on the strand, so the asio::post you /had/ was useless there

    • asio::post is an async initiation, meaning it will always immediately return (and without exception), making the loop useless too.

    • Instead of manually enumerating endpoints, use asio::async_connect

    • You have a second promise. You only promise 1 thing. The second promise only causes you to want to block on the first. That was your entire issue. Don't. Just pass the original promise along until the connection actually completed

      P.S. Ironically, I later noticed that clearly you based your example on the above TCP TLS client (as evidenced by the "wrong" "syslog tls client: " string). As you will see in the summary below, it had exactly the solution that you needed before you removed it :)

    • You had changed the msg = std::string(msg) from the SendMessageAsync lambda. That creates UB because the string view may refer to memory no longer available when the async operation runs. This is also why it is extremely important that m_outbox contain std::string, not std::string_view.

    • You async_send_to to m_ep but m_ep is never assigned. Therefore, nothing will ever be sent (let alone arrive)

    • Since you're trying to test the resolver, you should probably change the constructor arguments from:

       std::shared_ptr<IConnection> conn =
           std::make_shared<UdpConnection>(ioc.get_executor(), "127.0.0.1", "514");
      

      To something more like:

       std::shared_ptr<IConnection> conn =
           std::make_shared<UdpConnection>(ioc.get_executor(), "localhost", "syslog");
      

    Fixing all these things you get:

    Live On Coliru

    // #define BOOST_ASIO_ENABLE_HANDLER_TRACKING
    #include <boost/asio.hpp>
    #include <deque>
    #include <iomanip>
    #include <iostream>
    
    using namespace std::chrono_literals;
    namespace asio = boost::asio;
    
    struct IConnection {
        using error_code = boost::system::error_code;
        using udp        = asio::ip::udp;
    
        virtual ~IConnection()                          = default;
        virtual void Connect()                          = 0;
        virtual void SendMessageAsync(std::string_view) = 0;
        virtual void Close()                            = 0;
    };
    
    class UdpConnection
        : public IConnection
        , public std::enable_shared_from_this<UdpConnection> {
      public:
        UdpConnection(asio::any_io_executor ex, std::string to, std::string port);
    
        void Connect()                                  override; 
        void SendMessageAsync(std::string_view message) override; 
        void Close()                                    override; 
    
      private:
        void DoSendLoop();
        void HandleWritten(error_code ec, size_t bytes_transferred);
        void DoConnect(std::promise<void> p);
    
      private:
        const std::string m_to;
        const std::string m_port;
    
        udp::socket   m_socket;
        udp::resolver m_resolver;
        udp::endpoint m_ep;
    
        std::deque<std::string> m_outbox;
    };
    
    UdpConnection::UdpConnection(asio::any_io_executor ex, std::string to, std::string port)
        : m_to(std::move(to))
        , m_port(std::move(port))
        , m_socket(make_strand(ex), udp::v4())
        , m_resolver(m_socket.get_executor()) {}
    
    void UdpConnection::Connect() {
        std::promise<void> p;
        auto f = p.get_future();
    
        asio::post(                                                          //
            m_socket.get_executor(),                                         //
            [this, p = std ::move(p), self = shared_from_this()]() mutable { //
                DoConnect(std::move(p));
            });
        f.get();
    }
    
    void UdpConnection::SendMessageAsync(std::string_view message) {
        asio::post(m_socket.get_executor(),
                   [this, self = shared_from_this(), msg = std::string(message)]() mutable {
                       m_outbox.emplace_back(std::move(msg));
                       if (m_outbox.size() == 1)
                           DoSendLoop();
                   });
    }
    
    void UdpConnection::DoSendLoop()
    {
        if (m_outbox.empty())
            return;
    
        m_socket.async_send_to( //
            asio::buffer(m_outbox.front()), m_ep,
            bind(&UdpConnection::HandleWritten, shared_from_this(), std::placeholders::_1,
                 std::placeholders::_2));
    }
    
    void UdpConnection::Close()
    {
        try {
            if (m_socket.is_open()) {
                error_code ignoredEc;
                m_socket.shutdown(udp::socket::shutdown_both, ignoredEc);
                m_socket.close(ignoredEc);
            }
        } catch (std::exception const& ex) {
            std::cout<< "Run-time exception: " << ex.what() << std::endl;
        }
    }
    
    void UdpConnection::HandleWritten(error_code  ec, size_t /*bytes_transferred*/) {
        m_outbox.pop_front();
    
        if (!ec) {
            std::cout << "Connect: " << ec.message() << std::endl;
        }
    
        DoSendLoop();
    }
    
    void UdpConnection::DoConnect(std::promise<void> p) {
        m_resolver.async_resolve(
            udp::v4(), m_to, m_port,
            [this, p = std::move(p), self = shared_from_this()] //
            (error_code ec, udp::resolver::iterator eps) mutable {
                if (ec) {
                    p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                } else {
                    asio::async_connect( //
                        m_socket, eps,
                        [this, p = std::move(p), self = shared_from_this()] //
                        (error_code ec, udp::resolver::iterator it) mutable {
                            if (ec) {
                                std::cout << "Connect: " << ec.message() << std::endl;
                                p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                            } else {
                                m_ep = it->endpoint();
                                std::cout << "Connect: " << m_ep << std::endl;
                                p.set_value();
                            }
                        });
                }
            });
    }
    
    int main(int argc, char** argv) {
        asio::thread_pool ioc{1};
    
        std::shared_ptr<IConnection> conn =
            std::make_shared<UdpConnection>(ioc.get_executor(), "localhost", "syslog");
    
        conn->Connect();
    
        std::vector<std::string> words(argv + 1, argv + argc);
        words.push_back("bye world");
        for (auto w : words)
            conn->SendMessageAsync("syslog udp client: " + w);
    
        std::this_thread::sleep_for(5000ms);
        conn->Close();
    
        ioc.join();
    }
    

    With a local demonstration:

    enter image description here

    Summary

    In short, the promise is only there for the end user to synchronize on connection completion. You never block inside handlers.

    Also, interestingly enough, all this was already demonstrated in the TLS-enabled example that you apparently took as the starting point:

    void do_connect(std::promise<void> p) {
        EKA_TRACE_ERROR() << "Connecting to: " << m_ep << std::endl;
        m_socket.next_layer().async_connect(
            m_ep, [this, p = std::move(p), self = shared_from_this()](error_code ec) mutable {
                EKA_TRACE_ERROR() << "Connect: " << ec.message() << std::endl;
                if (ec)
                    p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                else {
                    m_socket.async_handshake( //
                        Stream::client, [p = std::move(p), self](error_code ec) mutable {
                            EKA_TRACE_ERROR() << "Handshake: " << ec.message() << std::endl;
                            if (ec)
                                p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                            else
                                p.set_value();
                        });
                }
            });
    }
    

    This is structurally identical to the DoConnect that we ended up with after all the fixes:

    void UdpConnection::DoConnect(std::promise<void> p) {
        m_resolver.async_resolve(
            udp::v4(), m_to, m_port,
            [this, p = std::move(p), self = shared_from_this()] //
            (error_code ec, udp::resolver::iterator eps) mutable {
                if (ec) {
                    p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                } else {
                    asio::async_connect( //
                        m_socket, eps,
                        [this, p = std::move(p), self = shared_from_this()] //
                        (error_code ec, udp::resolver::iterator it) mutable {
                            if (ec) {
                                std::cout << "Connect: " << ec.message() << std::endl;
                                p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                            } else {
                                m_ep = it->endpoint();
                                std::cout << "Connect: " << m_ep << std::endl;
                                p.set_value();
                            }
                        });
                }
            });
    }