Assume the objective to test a synchronous tcp client using a asynchronous tcp server on localhost.
now consider how both client and server interact in the following testcase:
#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <vector>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
class session : public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() { do_read(); }
private:
void do_read()
{
std::cout << "Server reads.\n";
auto self(shared_from_this());
socket_.async_read_some(
boost::asio::buffer(data_, max_length),
[this, self](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
do_write(length);
}
});
}
void do_write(std::size_t length)
{
std::cout << "Server writes: ";
for (auto i : data_) std::cout << i;
std::cout << "\n";
auto self(shared_from_this());
boost::asio::async_write(
socket_,
boost::asio::buffer(data_, length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
do_read();
}
});
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class EchoSever
{
public:
EchoSever(boost::asio::io_context& io_context, boost::asio::ip::address ipAddress, short port)
: acceptor_(io_context, tcp::endpoint(ipAddress, port))
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket)
{
if (!ec)
{
std::cout << "client connected.\n";
std::make_shared<session>(std::move(socket))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
int main()
{
const auto ipAddress{boost::asio::ip::make_address("127.0.0.1")};
const short port{8080};
boost::asio::io_context io_context;
EchoSever echoServer(io_context, ipAddress, port);
auto waiter = std::async(std::launch::async, [&]()
{
io_context.run();
std::cout << "echoServer has finished\n";
});
// client code:
boost::asio::ip::tcp::socket socket(io_context);
boost::asio::ip::tcp::resolver resolver(io_context);
boost::asio::connect(socket, resolver.resolve(ipAddress.to_string(), std::to_string(port)));
const std::string requestMessage{"test123"};
boost::asio::write(socket, boost::asio::buffer(requestMessage, requestMessage.size()));
std::cout << "client wrote request: " << requestMessage << "\n";
char reply[1024];
size_t reply_length = boost::asio::read(socket, boost::asio::buffer(reply, requestMessage.size()));
std::cout << "reply is: ";
std::cout.write(reply, reply_length);
std::cout << "\n";
io_context.stop();
waiter.wait();
}
The desired behavior would in order look like:
Running the code on both godbolt and on my box gave different results on multiple runs thus there must be at least a synchronization issue i try to understand.
As far as i understand it, calling io_context.run()
in a separate thread let the completion handlers associated with the io_context
instance run in this thread. As long as these completion handlers operate on distinct objects no synchronization is needed. As far as i can see it, they operate on distinct socket objects in the example above so this is okay.
When the client calls boost::asio::write
it is clear that the server already has a socket that is associated with the given endpoint, otherwise the client call to boost::asio::connect
would have failed before. Afterwards, the server call to start()
seems to race with the client calling boost::asio::read
, i.e. it looks like it may happen that io_context.stop();
is reached before do_read
is even invoked. This is a bit surprising since i expected the clients boost::asio::read
to block until data has arrived. As far as i see it this would even happen if the completion handlers run in one strand.
What is wrong in my picture?
How must the client look like to achieve the desired behavior?
std::async
isn't as explicit about threading as I'd like. I'd prefer just std::thread
.
That said, there's another problem that might trip you up: asio::read
is not only blocking, but unless - as you have done - you limit the buffer to exactly the amount of bytes to be received, the call will block "indefinitely" until either
B will never happen in your test. This is where you would either use read_some
or have the server pro-actively close sessions.
Last but not least: is there a race where the server may not be listening at the time the client connect
s? I don't think there is because EchoServer
's constructor opens the acceptor and initiates the asynchronous accept. Even though the service is not yet being run, this means that the kernel should be queueing incoming IP packets for that socket.
There were many improvements in this listing (e.g. only printing the received bytes instead of the entire buffer, quoting them, tracing with timestamps, thread ids and exclusive access to the console, error reporting and handling (e.g. EOF), using loopback()
instead of... parsing the string "127.0.0.1", making the port unsigned, using a thread explicitly over std::async
, not manually overriding buffer sizes in asio::buffer()
calls, canceling the listener instead of hard-stopping the io context, possibly more that I forget).
Live On Coliru
#include <boost/asio.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;
namespace {
static auto timestamp() {
auto now = std::chrono::high_resolution_clock::now;
using namespace std::chrono_literals;
static auto start = now();
auto t = now();
return (t - start) / 1.ms;
}
static std::atomic_int tid_gen = 0;
thread_local int tid = tid_gen++;
std::mutex console_mx;
void trace(auto const&... args) {
std::lock_guard lk(console_mx);
((std::cout << "T:" << tid << std::right << std::setw(10) << timestamp() << "ms ") << ... << args)
<< std::endl;
}
} // namespace
class Session : public std::enable_shared_from_this<Session> {
public:
Session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() { do_read(); }
private:
void do_read() {
trace("Server reads.");
auto self = shared_from_this();
socket_.async_read_some(asio::buffer(data_), [this, self](error_code ec, size_t length) {
trace("Session read: ", ec.message(), " ", length);
if (!ec || (length && ec == asio::error::eof))
do_write(length);
});
}
void do_write(size_t length) {
trace("Server writes: ", quoted(std::string_view(data_.data(), length)));
auto self = shared_from_this();
async_write(socket_, asio::buffer(data_, length), [this, self](error_code ec, size_t length) {
trace("Session write: ", ec.message(), " ", length);
if (!ec)
do_read();
});
}
tcp::socket socket_;
std::array<char, 1024> data_;
};
class EchoServer {
public:
EchoServer(asio::any_io_executor ex, asio::ip::address ipAddress, short port)
: acceptor_(ex, tcp::endpoint(ipAddress, port)) {
do_accept();
}
void stop() {
post(acceptor_.get_executor(), [this] {
acceptor_.cancel(); /* or close() */
});
}
private:
void do_accept() {
acceptor_.async_accept( //
make_strand(acceptor_.get_executor()), //
[this](error_code ec, tcp::socket socket) {
if (ec)
trace("Accept: ", ec.message());
if (!ec) {
trace("Accepted ", socket.remote_endpoint());
std::make_shared<Session>(std::move(socket))->start();
do_accept();
}
});
}
tcp::acceptor acceptor_;
};
int main() {
std::cout << std::fixed << std::setprecision(5);
trace("Main start");
auto const ipAddress = asio::ip::address_v4::loopback();
uint16_t const port = 8080;
asio::io_context io_context;
EchoServer echoServer(io_context.get_executor(), ipAddress, port);
auto io_thread = std::thread([&]() {
io_context.run();
trace("Service has finished");
});
{
// client code:
tcp::socket socket(io_context);
tcp::resolver resolver(io_context);
connect(socket, resolver.resolve(ipAddress.to_string(), std::to_string(port)));
{
std::string const request{"test123"};
write(socket, asio::buffer(request));
trace("Main client wrote request: ", quoted(request));
}
{
std::array<char, 1024> reply;
size_t n = socket.read_some(asio::buffer(reply));
trace("Main reply is: ", quoted(std::string_view(reply.data(), n)));
}
} // destructor closes connection
echoServer.stop();
trace("Main service stopped, waiting");
io_thread.join();
trace("Main bye");
}
Prints e.g.
T:0 0.00171ms Main start
T:0 0.36595ms Main client wrote request: "test123"
T:1 0.42401ms Accepted 127.0.0.1:48852
T:1 0.47220ms Server reads.
T:1 0.53624ms Session read: Success 7
T:1 0.55286ms Server writes: "test123"
T:1 0.60766ms Session write: Success 7
T:1 0.63068ms Server reads.
T:0 0.65522ms Main reply is: "test123"
T:0 0.70585ms Main service stopped, waiting
T:1 0.73496ms Session read: End of file 0
T:1 0.80807ms Accept: Operation canceled
T:1 0.82992ms Service has finished
T:0 0.87840ms Main bye
Or many runs locally:
Let me know whether you can still reproduce the behaviour that you didn't fully describe.