Search code examples
c++multithreadingboosttcpboost-asio

Boost ASIO: Handling synchronous and asynchronous operations in the same class using the same io_context


I'm trying to write a class using Boost ASIO that can create a TCP connection, send and receive data. It should be able to perform all of those operations via method calls asynchronously as well as synchronously (blocking) with a timeout. My goal is to never have the user interact with ASIO whatsoever, including creating an io_context object.

It currently looks approximately like this (without send/receive methods):

class TCPClient {
public:
    TCPClient() {
        deadline_.expires_from_now(std::chrono::duration<int>::max());
        check_deadline();
    }

    // --- Callbacks ---
    using ConnectCallback = std::function<void()>;

    // Synchronous and asynchronous
    auto connect(const std::string& host, int port, int timeout=0) -> void;
    auto connect_async(const std::string& host, int port, ConnectCallback callback) -> void;

    auto close() -> void {
        if(socket_) {
            boost::system::error_code ignored_ec;
            socket_->close(ignored_ec);
        }
    }

private:

    /**
     * Adapted from https://www.boost.org/doc/libs/1_52_0/doc/html/boost_asio/example/timeouts/blocking_tcp_client.cpp
     * Replaced deadline_timer with steady_timer to make it compatible with std::chrono
    */
    auto check_deadline() -> void {
        // Close socket if the timer has expired
        if(deadline_.expires_at() <= std::chrono::steady_clock::now()) {
            close();
            deadline_.expires_from_now(std::chrono::duration<int>::max());
        }

        deadline_.async_wait( [this](const auto& ec) {
            check_deadline();
        });
    }

    auto socket() -> tcp::socket& {
        // Create socket if it doesn't exist
        if(!socket_) {
            socket_ = std::make_unique<tcp::socket>(io_);
        }

        // Open interface 
        if(!socket_->is_open()) {
            socket_->open(tcp::v4());
        }
        return *socket_;
    }

    boost::asio::io_context io_;
    std::unique_ptr<tcp::socket> socket_;
    boost::asio::steady_timer deadline_;
};

Using the deadline concept provided by the documentation of ASIO I managed to get the synchronous blocking connect method to work:

auto TCPClient::connect(const std::string& host, int port, int timeout) -> void {
    // Resolve remote endpoint
    auto query = tcp::resolver::query(host, std::to_string(port));
    auto iter = tcp::resolver(io_).resolve(query);

    // setup deadline
    if(timeout) deadline_.expires_from_now(std::chrono::seconds(timeout));

    // start asynchronous connection
    boost::system::error_code ec = net::error::would_block;     // Never a valid error code if async operation fails
    socket().async_connect( iter->endpoint(), [&ec](const auto& e) {
        ec = e;
    });

    do {
        io_.run_one();
    } while(ec == net::error::would_block);

    if(ec || !socket().is_open()) {
        throw SocketException( get_error_message(ec) );
    }
}

As you can see the blocking version of this method does use asynchronous operations under the hood. The waiting is done by calling io_context.run_one() repeatedly. Now here's my question: What's the best approach to implementing the connect_async method? I thought about starting a thread performing io_context.run() each time an asynchronous operation is called, but I don't seem to get it to work and I think there could be problems if I call synchronous and asynchronous methods together. Or should I maybe toss this approach out of the window and create separate classes for blocking/non blocking operations? As all communication should pass through the same socket, separate io_context objects wouldn't work either.


Solution

  • You can use io_context::run() with io_context::stop() and io_context::restart() repeatedly. However, I would not recommend it.

    In fact, I recommend not tying the IO object to an execution context at all, instead accepting an executor object. That way, multiple TCPClient will be able to share resources and the service thread.

    I mid-way point might be for each TPCClient to contain an io_context instance and a worker thread. For that to work, you need a work-guard to avoid the thread running out of work prematurely.

    Depending of what approach you use, you may need to avoid a data-race in check_deadline by synchronizing access to the object's state. As long as there's only a single service thread and check_deadline is only called from the completion handler it is already safe.

    Review Of connect():

    • You are not applying the timeout to DNS resolution. This seems like it is probably not intentional
    • You are only using the first endpoint from the resolver results. This is also probably not what you want. Use asio::async_connect instead
    • check_deadline is called unconditionally, even when the timer is merely canceled. Also, the timer is not canceled on successful completion. You need to check for timer cancellation
    • You're translating error codes, I can't see how, but make sure you use error_condition instead of comparing error codes. Also, consider just using error_code::message.
    • You're using would_block as a stand-in. Consider just error_code{} which has conditional conversion to bool. Or indeed use optional<error_code> for expressive code
    • You're using dynamica allocation for the socket_. It's unclear why.

    Keeping with your current io_context setup, I'd simplify and fix all of the above by writing the operation to be self-contained:

    #include <boost/asio.hpp>
    #include <iomanip>
    #include <iostream>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using namespace std::chrono_literals;
    
    class TCPClient {
      public:
        using error_code      = boost::system::error_code;
        using ConnectCallback = std::function<void()>;
    
        // Synchronous and asynchronous
        void connect(std::string const& host, int port, int timeout = 0);
        void connect_async(std::string const& host, int port, ConnectCallback callback);
    
      private:
        asio::io_context io_;
        tcp::socket      socket_{io_};
    };
    
    void TCPClient::connect(std::string const& host, int port, int timeout) {
        if (io_.stopped())
            io_.restart();
    
        error_code    result;
        tcp::resolver resolver{io_};
    
        // setup optional deadline
        asio::steady_timer deadline{io_, std::chrono::seconds(timeout)};
        if (timeout)
            deadline.async_wait([&](error_code ec) { if (!ec) socket_.cancel(); });
    
        // resolve and connect under timer
        socket_ = tcp::socket(io_);
        resolver.async_resolve(host, std::to_string(port), [&](error_code ec, tcp::resolver::results_type eps) {
            if (!(result = ec)) {
                asio::async_connect(socket_, eps, [&](error_code ec, tcp::endpoint) {
                    if (!(result = ec))
                        deadline.cancel();
                });
            }
        });
    
        io_.run();
    
        if (result.failed() || !socket_.is_open())
            // throw SocketException(result.message());
            throw boost::system::system_error(result);
    }
    
    int main() {
        TCPClient c;
        c.connect("localhost", 8989, 1);
    }