Search code examples
c++asynchronousboostc++20boost-asio

Boost Asio async client


I'm trying to write a async client using Boost asio, I wrote the following code,

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/beast/http.hpp>
#include <boost/json/src.hpp>
namespace json = boost::json;
namespace asio = boost::asio;
namespace this_coro = asio::this_coro;
using asio::ip::tcp;
using boost::system::error_code;
using namespace asio::experimental::awaitable_operators;

#include <iostream>
using namespace std;




template <typename T> using Defer = asio::deferred_t::as_default_on_t<T>;
using Socket = Defer<tcp::socket>;
using Acceptor = Defer<tcp::acceptor>;



using boost::asio::ip::tcp;

class TCPClient {
public:
    TCPClient(boost::asio::io_service& io_service, tcp::resolver::iterator endpoint_iterator)
        : io_service_(io_service), socket_(io_service) {
        connect(endpoint_iterator);
    }

    void write(const std::string& message) {
        io_service_.post(boost::bind(&TCPClient::do_write, this, message));
    }

    void close() {
        io_service_.post(boost::bind(&TCPClient::do_close, this));
    }

private:
    void connect(tcp::resolver::iterator endpoint_iterator) {
        boost::asio::async_connect(socket_, endpoint_iterator,
            boost::bind(&TCPClient::handle_connect, this,
                boost::asio::placeholders::error));
    }

    void handle_connect(const boost::system::error_code& error) {
        if (!error) {
            cout << "connected" << endl;
            write("Hello, World!");
            read();
        }
        else {
            cout << "connection failed" << endl;
        }
    }

    void do_write(const std::string& message) {
        boost::asio::async_write(socket_,
            boost::asio::buffer(message),
            boost::bind(&TCPClient::handle_write, this,
                boost::asio::placeholders::error));
    }

    void handle_write(const boost::system::error_code& error) {
        if (!error) {
            cout << "write successfully" << endl;
        }
        else {
            cout << "write failed " << error << endl;
        }
    }

    void read() {
        socket_.async_read_some(boost::asio::buffer(data_, max_length),
            boost::bind(&TCPClient::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));

        //socket_.async_read_some(buf.prepare(1024),
        //    boost::bind(&TCPClient::handle_read, this,
        //        boost::asio::placeholders::error,
        //        boost::asio::placeholders::bytes_transferred));
    }

    void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
        if (!error) {
            std::cout << "Received: " << bytes_transferred << std::endl;
            read();
        }
        else {
            // Handle the error
            std::cout << "read failed " << error << std::endl;
        }
    }

    void do_close() {
        socket_.close();
    }

    boost::asio::io_service& io_service_;
    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length] = { 0 };
    boost::asio::streambuf buf;
};

int main() {
    boost::asio::io_service io_service;
    tcp::resolver resolver(io_service);
    tcp::resolver::iterator endpoint_iterator = resolver.resolve("localhost", "8989");
    TCPClient client(io_service, endpoint_iterator);
    io_service.run();

    while (1) {}

    return 0;
}

The client can connect successfully and the handler is being called. Using wireshark I can also see that write() function sends "Hello, World" message and gets a reply back, but neither of write or read handlers are getting called.

I tried different stuff saw different sample codes but I couldn't figure it out. Also tried to write the same code using coroutine and the code can be found below but, I'm new to async sockets and Boost Asio and I wasn't sure what I'm still doing(But trying hard to get there),

#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/use_awaitable.hpp>

using boost::asio::ip::tcp;
namespace asio = boost::asio;
using asio::awaitable;
using asio::co_spawn;
using asio::detached;
using asio::use_awaitable;

awaitable<void> connectAndCommunicate() {
    try {
        asio::io_context io_context;
        tcp::socket socket(io_context);

        co_await socket.async_connect(tcp::endpoint(asio::ip::address::from_string("127.0.0.1"), 12345), use_awaitable);

        std::string message = "Hello, World!";
        co_await asio::async_write(socket, asio::buffer(message), use_awaitable);

        char data[128];
        std::size_t length = co_await socket.async_read_some(asio::buffer(data), use_awaitable);
        std::cout << "Received: " << std::string(data, length) << std::endl;

    } catch (const std::exception& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }
}

int main() {
    asio::io_context io_context;
    co_spawn(io_context, connectAndCommunicate(), detached);
    io_context.run();
    return 0;
}

I would appreciate more experienced developers to point out my mistakes and how is best to do what I'm trying to do.

Thanks


Solution

  • First Version Review

    1. don't use using namespace
    2. don't use boost::bind, and certainly not the obsolete boost/bind.hpp include
    3. don't include a ton of unneeded headers, drop unused type aliases
    4. use abbreviated namespaces that you have
      • use io_context instead of deprecated io_service
      • don't pass around references to io contexts; instead use executors
    5. suggest to not leak implementation details in the interface (take host/port as constructor arguments, not some specific kind of iterator)
    6. when on the strand, don't need to post, so in handle_connect do do_write(), instead of write()

    Now we're down to 70 lines, from 120: Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    namespace asio = boost::asio;
    using namespace std::placeholders;
    using asio::ip::tcp;
    using boost::system::error_code;
    
    static std::string const hello_message = "Hello World!";
    
    class TCPClient {
      public:
        TCPClient(asio::io_context& ioc, std::string const& host, std::string const& port) : socket_(ioc) {
            tcp::resolver resolver(ioc);
            connect(resolver.resolve(host, port));
        }
    
        void write(std::string const& message) {
            post(socket_.get_executor(), std::bind(&TCPClient::do_write, this, message));
        }
    
        void close() { post(socket_.get_executor(), std::bind(&TCPClient::do_close, this)); }
    
      private:
        void connect(tcp::resolver::iterator endpoint_iterator) {
            async_connect(socket_, endpoint_iterator, std::bind(&TCPClient::handle_connect, this, _1));
        }
    
        void handle_connect(error_code ec) {
            if (!ec) {
                std::cout << "connected" << std::endl;
                do_write(hello_message);
                read();
            } else {
                std::cout << "connection failed (" << ec.message() << ")" << std::endl;
            }
        }
    
        void do_write(std::string const& message) {
            async_write(socket_, asio::buffer(message), std::bind(&TCPClient::handle_write, this, _1));
        }
    
        void handle_write(error_code ec) { std::cout << "write " << ec.message() << std::endl; }
    
        void read() {
            socket_.async_read_some(asio::buffer(data_), std::bind(&TCPClient::handle_read, this, _1, _2));
    
            // socket_.async_read_some(buf.prepare(1024), std::bind(&TCPClient::handle_read, this, _1, _2));
        }
    
        void handle_read(error_code ec, size_t bytes_transferred) {
            std::cout << "Read " << ec.message() << " (" << bytes_transferred << ")" << std::endl;
            if (!ec) {
                std::cout << "Received: " << bytes_transferred << std::endl;
                read();
            } else {
                // Handle the error
            }
        }
    
        void do_close() { socket_.close(); }
    
        tcp::socket            socket_;
        std::array<char, 1024> data_{0};
        // asio::streambuf     buf;
    };
    
    int main() {
        asio::io_context ioc;
        TCPClient        client(ioc, "localhost", "8989");
        ioc.run();
    }
    

    I may have inadvertantly fixed a problem that I didn't consciously mention, but to me it looks like it Just Works(TM):

    enter image description here

    Second Version Review

    That was a LOT cleaner already! I'd suggest

    • drop the extra io_context, you're IN a coroutine, there is by definition a context. Get the executor from this_coro
    • using deferred over use_awaitable for efficiency
    • not using raw C arrays

    To be closer to the first listing:

    • Use the same buffer size
    • Also take the host/port information and resolve it like in the first listing
    • Loop on the reads

    We get: Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/use_awaitable.hpp>
    #include <iostream>
    
    using boost::asio::ip::tcp;
    namespace asio = boost::asio;
    
    asio::awaitable<void> connectAndCommunicate(std::string host, std::string port) {
        try {
            auto ex = co_await asio::this_coro::executor;
            auto token = asio::deferred;
    
            tcp::socket s(ex);
            tcp::resolver resolver(ex);
            co_await async_connect(s, co_await resolver.async_resolve(host, port, token), token);
    
            std::string message = "Hello, World!";
            co_await asio::async_write(s, asio::buffer(message), token);
    
            for (;;) {
                std::array<char, 1024> buf;
                auto length = co_await s.async_read_some(asio::buffer(buf), token);
                std::cout << "Received: " << quoted(std::string_view(buf.data(), length)) << std::endl;
            }
        } catch (boost::system::system_error const& se) {
            std::cerr << "Exception: " << se.code().message() << std::endl;
        }
    }
    
    int main() {
        asio::io_context ioc;
        co_spawn(ioc, connectAndCommunicate("127.0.0.1", "8989"), asio::detached);
        ioc.run();
    }
    

    enter image description here

    • I'd consider making deferred the default executor. Ironically, the first code listing had those type aliases defined where they weren't even relevant :) Live On Coliru

    Loose Ends

    Things glossed over because they weren't part of your question:

    • strands and output message queueing (you don't have multiple output messages anyways now)
    • message framing. async_read_some is a poor method if you need to interpret messages. You will get partial/concatenated messages. This is why composed operations and dynamic buffers exist.
    • error handling. I slightly improved it, but you need to be aware of partial success (especially with composed reads, but perhaps async_read_some can return bytes and EOF all in itself)