Search code examples
c++boostboost-asio

Boost Asio synchronization between tcp client and server


Assume the objective to test a synchronous tcp client using a asynchronous tcp server on localhost.

now consider how both client and server interact in the following testcase:

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <vector>
#include <boost/asio.hpp>

using boost::asio::ip::tcp;

class session : public std::enable_shared_from_this<session>
{
public:
    session(tcp::socket socket) : socket_(std::move(socket)) {}
    void start() { do_read(); }

private:
    void do_read()
    {
        std::cout << "Server reads.\n";
        auto self(shared_from_this());
        socket_.async_read_some(
            boost::asio::buffer(data_, max_length),
            [this, self](boost::system::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    do_write(length);
                }
            });
    }

    void do_write(std::size_t length)
    {
        std::cout << "Server writes: ";
        for (auto i : data_) std::cout << i;
        std::cout << "\n";
        auto self(shared_from_this());
        boost::asio::async_write(
            socket_,
            boost::asio::buffer(data_, length),
            [this, self](boost::system::error_code ec, std::size_t /*length*/)
            {
                if (!ec)
                {
                    do_read();
                }
            });
    }

    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length];
};

class EchoSever
{
public:
  EchoSever(boost::asio::io_context& io_context, boost::asio::ip::address ipAddress, short port)
  : acceptor_(io_context, tcp::endpoint(ipAddress, port))
  {
      do_accept();
  }

private:
    void do_accept()
    {
        acceptor_.async_accept(
        [this](boost::system::error_code ec, tcp::socket socket)
        {
            if (!ec)
            {
                std::cout << "client connected.\n";
                std::make_shared<session>(std::move(socket))->start();
            }

            do_accept();
        });
    }

    tcp::acceptor acceptor_;
};

int main()
{
    const auto ipAddress{boost::asio::ip::make_address("127.0.0.1")};
    const short port{8080};

    boost::asio::io_context io_context;
    EchoSever echoServer(io_context, ipAddress, port);

    auto waiter = std::async(std::launch::async, [&]()
    {
        io_context.run();
        std::cout << "echoServer has finished\n";
    });

    // client code:
    boost::asio::ip::tcp::socket socket(io_context);
    boost::asio::ip::tcp::resolver resolver(io_context);
    boost::asio::connect(socket, resolver.resolve(ipAddress.to_string(), std::to_string(port)));

    const std::string requestMessage{"test123"};
    boost::asio::write(socket, boost::asio::buffer(requestMessage, requestMessage.size()));
    std::cout << "client wrote request: " << requestMessage << "\n";

    char reply[1024];
    size_t reply_length = boost::asio::read(socket, boost::asio::buffer(reply, requestMessage.size()));
    std::cout << "reply is: ";
    std::cout.write(reply, reply_length);
    std::cout << "\n";

    io_context.stop();
    waiter.wait();
}

The desired behavior would in order look like:

  1. The server instance asynchronously waits on clients connecting.
  2. The client connects and writes the requestMessage while the server asynchronously reads and writes back.
  3. The client blocks until the reply arrives.

Running the code on both godbolt and on my box gave different results on multiple runs thus there must be at least a synchronization issue i try to understand.

As far as i understand it, calling io_context.run() in a separate thread let the completion handlers associated with the io_context instance run in this thread. As long as these completion handlers operate on distinct objects no synchronization is needed. As far as i can see it, they operate on distinct socket objects in the example above so this is okay.

When the client calls boost::asio::write it is clear that the server already has a socket that is associated with the given endpoint, otherwise the client call to boost::asio::connect would have failed before. Afterwards, the server call to start() seems to race with the client calling boost::asio::read, i.e. it looks like it may happen that io_context.stop(); is reached before do_read is even invoked. This is a bit surprising since i expected the clients boost::asio::read to block until data has arrived. As far as i see it this would even happen if the completion handlers run in one strand.

What is wrong in my picture?

How must the client look like to achieve the desired behavior?


Solution

  • std::async isn't as explicit about threading as I'd like. I'd prefer just std::thread.

    That said, there's another problem that might trip you up: asio::read is not only blocking, but unless - as you have done - you limit the buffer to exactly the amount of bytes to be received, the call will block "indefinitely" until either

    • (A) the buffer is full
    • (B) an error occurred (e.g. the server hangs up the connection)

    B will never happen in your test. This is where you would either use read_some or have the server pro-actively close sessions.

    Last but not least: is there a race where the server may not be listening at the time the client connects? I don't think there is because EchoServer's constructor opens the acceptor and initiates the asynchronous accept. Even though the service is not yet being run, this means that the kernel should be queueing incoming IP packets for that socket.

    Reviewed Listing

    There were many improvements in this listing (e.g. only printing the received bytes instead of the entire buffer, quoting them, tracing with timestamps, thread ids and exclusive access to the console, error reporting and handling (e.g. EOF), using loopback() instead of... parsing the string "127.0.0.1", making the port unsigned, using a thread explicitly over std::async, not manually overriding buffer sizes in asio::buffer() calls, canceling the listener instead of hard-stopping the io context, possibly more that I forget).

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iomanip>
    #include <iostream>
    
    namespace asio = boost::asio;
    using boost::asio::ip::tcp;
    using boost::system::error_code;
    
    namespace {
        static auto timestamp() {
            auto now = std::chrono::high_resolution_clock::now;
            using namespace std::chrono_literals;
            static auto start = now();
            auto        t     = now();
    
            return (t - start) / 1.ms;
        }
    
        static std::atomic_int tid_gen = 0;
        thread_local int       tid     = tid_gen++;
        std::mutex             console_mx;
    
        void trace(auto const&... args) {
            std::lock_guard lk(console_mx);
            ((std::cout << "T:" << tid << std::right << std::setw(10) << timestamp() << "ms ") << ... << args)
                << std::endl;
        }
    } // namespace
    
    class Session : public std::enable_shared_from_this<Session> {
      public:
        Session(tcp::socket socket) : socket_(std::move(socket)) {}
        void start() { do_read(); }
    
      private:
        void do_read() {
            trace("Server reads.");
            auto self = shared_from_this();
            socket_.async_read_some(asio::buffer(data_), [this, self](error_code ec, size_t length) {
                trace("Session read: ", ec.message(), " ", length);
                if (!ec || (length && ec == asio::error::eof))
                    do_write(length);
            });
        }
    
        void do_write(size_t length) {
            trace("Server writes: ", quoted(std::string_view(data_.data(), length)));
            auto self = shared_from_this();
            async_write(socket_, asio::buffer(data_, length), [this, self](error_code ec, size_t length) {
                trace("Session write: ", ec.message(), " ", length);
                if (!ec)
                    do_read();
            });
        }
    
        tcp::socket socket_;
        std::array<char, 1024> data_;
    };
    
    class EchoServer {
      public:
        EchoServer(asio::any_io_executor ex, asio::ip::address ipAddress, short port)
            : acceptor_(ex, tcp::endpoint(ipAddress, port)) {
            do_accept();
        }
    
        void stop() {
            post(acceptor_.get_executor(), [this] {
                acceptor_.cancel(); /* or close() */
            });
        }
    
      private:
        void do_accept() {
            acceptor_.async_accept(                    //
                make_strand(acceptor_.get_executor()), //
                [this](error_code ec, tcp::socket socket) {
                    if (ec)
                        trace("Accept: ", ec.message());
                    if (!ec) {
                        trace("Accepted ", socket.remote_endpoint());
                        std::make_shared<Session>(std::move(socket))->start();
    
                        do_accept();
                    }
                });
        }
    
        tcp::acceptor acceptor_;
    };
    
    int main() {
        std::cout << std::fixed << std::setprecision(5);
    
        trace("Main start");
        auto const     ipAddress = asio::ip::address_v4::loopback();
        uint16_t const port      = 8080;
    
        asio::io_context io_context;
        EchoServer       echoServer(io_context.get_executor(), ipAddress, port);
    
        auto io_thread = std::thread([&]() {
            io_context.run();
            trace("Service has finished");
        });
    
        {
            // client code:
            tcp::socket socket(io_context);
    
            tcp::resolver resolver(io_context);
            connect(socket, resolver.resolve(ipAddress.to_string(), std::to_string(port)));
    
            {
                std::string const request{"test123"};
                write(socket, asio::buffer(request));
                trace("Main client wrote request: ", quoted(request));
            }
    
            {
                std::array<char, 1024> reply;
                size_t n = socket.read_some(asio::buffer(reply));
                trace("Main reply is: ", quoted(std::string_view(reply.data(), n)));
            }
        } // destructor closes connection
    
        echoServer.stop();
        trace("Main service stopped, waiting");
        io_thread.join();
        trace("Main bye");
    }
    

    Prints e.g.

    T:0   0.00171ms Main start
    T:0   0.36595ms Main client wrote request: "test123"
    T:1   0.42401ms Accepted 127.0.0.1:48852
    T:1   0.47220ms Server reads.
    T:1   0.53624ms Session read: Success 7
    T:1   0.55286ms Server writes: "test123"
    T:1   0.60766ms Session write: Success 7
    T:1   0.63068ms Server reads.
    T:0   0.65522ms Main reply is: "test123"
    T:0   0.70585ms Main service stopped, waiting
    T:1   0.73496ms Session read: End of file 0
    T:1   0.80807ms Accept: Operation canceled
    T:1   0.82992ms Service has finished
    T:0   0.87840ms Main bye
    

    Or many runs locally:

    enter image description here

    Let me know whether you can still reproduce the behaviour that you didn't fully describe.