I'm trying to implement a simple IPC protocol for a project that will be built using Boost ASIO. The idea is to have the communication be done through IP/TCP, with a server with the backend and a client that will be using the data received from the server to build the frontend. The whole session would go like this:
propertiesPacket
)processedData
n
bytes)n
byte packetI tried implementing this by myself, following the great tutorial available in Boost ASIO's documentation, as well as the examples included in the library and some repos I found on Github, but as this is my first hand working with networking and IPC, I couldn't make it work, my client returns an exception saying the connection was reset by the peer.
What I have right now is this:
// File client.cpp
int main(int argc, char *argv[])
{
try {
propertiesPacket properties;
// ...
// We set the data inside the properties struct
// ...
boost::asio::io_context io;
boost::asio::ip::tcp::socket socket(io);
boost::asio::ip::tcp::resolver resolver(io);
boost::asio::connect(socket, resolver.resolve(argv[1], argv[2]));
boost::asio::write(socket, boost::asio::buffer(&properties, sizeof(propertiesPacket)));
unsigned short responseSize {};
boost::asio::read(socket, boost::asio::buffer(&responseSize, sizeof(short)));
processedData* response = reinterpret_cast<processedData*>(malloc(responseSize));
boost::asio::read(socket, boost::asio::buffer(response, responseSize));
// ...
// The client handles the data
// ...
return 0;
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
}
// File server.cpp
class ServerConnection
: public std::enable_shared_from_this<ServerConnection>
{
public:
using TCPSocket = boost::asio::ip::tcp::socket;
ServerConnection::ServerConnection(TCPSocket socket)
: socket_(std::move(socket)),
properties_(nullptr),
filePacket_(nullptr),
filePacketSize_(0)
{
}
void start() { doRead(); }
private:
void doRead()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(properties_, sizeof(propertiesPacket)),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec) {
processData();
doWrite(&filePacketSize_, sizeof(short));
doWrite(filePacket_, sizeof(*filePacket_));
}
});
}
void doWrite(void* data, size_t length)
{
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(data, length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec) { doRead(); }
});
}
void processData()
{ /* Data is processed */ }
TCPSocket socket_;
propertiesPacket* properties_;
processedData* filePacket_;
short filePacketSize_;
};
class Server
{
public:
using IOContext = boost::asio::io_context;
using TCPSocket = boost::asio::ip::tcp::socket;
using TCPAcceptor = boost::asio::ip::tcp::acceptor;
Server::Server(IOContext& io, short port)
: socket_(io),
acceptor_(io, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
{
doAccept();
}
private:
void doAccept()
{
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
{
if (!ec) {
std::make_shared<ServerConnection>(std::move(socket_))->start();
}
doAccept();
});
}
TCPSocket socket_;
TCPAcceptor acceptor_;
};
What did I do wrong? My guess is that inside the doRead
function, calling multiple times the doWrite
function, when that function then also calls doRead
is in part what's causing problems, but I don't know what the correct way of writing data asynchronously multiple times is. But I'm also sure that isn't the only part of my code that isn't behaving as I think it should.
Besides the problems with the code shown that I mentioned in the comments, there is indeed the problem that you suspected:
My guess is that inside the doRead function, calling multiple times the doWrite function, when that function then also calls doRead is in part what's causing problems
The fact that "doRead" is in the same function isn't necessarily a problem (that's just full-duplex socket IO). However "calling multiple times" is. See the docs:
This operation is implemented in terms of zero or more calls to the stream's
async_write_some
function, and is known as a composed operation. The program must ensure that the stream performs no other write operations (such asasync_write
, the stream'sasync_write_some
function, or any other composed operations that perform writes) until this operation completes.
The usual way is to put the whole message in a single buffer, but if that would be "expensive" to copy, you can use a BufferSequence, which is known as scatter/gather buffers.
Specifically, you would replace
doWrite(&filePacketSize_, sizeof(short));
doWrite(filePacket_, sizeof(*filePacket_));
with something like
std::vector<boost::asio::const_buffer> msg{
boost::asio::buffer(&filePacketSize_, sizeof(short)),
boost::asio::buffer(filePacket_, sizeof(*filePacket_)),
};
doWrite(msg);
Note that this assumes that
filePacketSize
andfilePacket
have been assigned proper values!
You could of course modify do_write
to accept the buffer sequence:
template <typename Buffers> void doWrite(Buffers msg)
{
auto self(shared_from_this());
boost::asio::async_write(
socket_, msg,
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec) {
doRead();
}
});
}
But in your case I'd simplify by inlining the body (now that you don't call it more than once anyway).
Don't use new
or delete
. NEVER use malloc
in C++. Never use reinterpret_cast<>
(except in the very rarest of exceptions that the standard allows!). Instead of
processedData* response = reinterpret_cast<processedData*>(malloc(responseSize));
Just use
processedData response;
(optionally add {}
for value-initialization of aggregates). If you need variable-length messages, consider to put a vector or a array<char, MAXLEN> inside the message. Of course, array is fixed length but it preserves POD-ness, so it might be easier to work with. If you use vector, you'd want a scatter/gather read into a buffer sequence like I showed above for the write side.
Instead of reinterpreting between inconsistent short
and unsigned short
types, perhaps just spell the type with the standard sizes: std::uint16_t
everywhere.
Keep in mind that you are not taking into account byte order so your protocol will NOT be portable across compilers/architectures.
This is the listing I ended up with after reviewing the code you shared.
#include <boost/asio.hpp>
#include <iostream>
namespace ba = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;
using TCPSocket = tcp::socket;
struct processedData { };
struct propertiesPacket { };
// File server.cpp
class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
public:
ServerConnection(TCPSocket socket) : socket_(std::move(socket))
{ }
void start() {
std::clog << __PRETTY_FUNCTION__ << std::endl;
doRead();
}
private:
void doRead()
{
std::clog << __PRETTY_FUNCTION__ << std::endl;
auto self(shared_from_this());
socket_.async_read_some(
ba::buffer(&properties_, sizeof(properties_)),
[this, self](error_code ec, std::size_t length) {
std::clog << "received: " << length << std::endl;
if (!ec) {
processData();
std::vector<ba::const_buffer> msg{
ba::buffer(&filePacketSize_, sizeof(uint16_t)),
ba::buffer(&filePacket_, filePacketSize_),
};
ba::async_write(socket_, msg,
[this, self = shared_from_this()](
error_code ec, std::size_t length) {
std::clog << " written: " << length
<< std::endl;
if (!ec) {
doRead();
}
});
}
});
}
void processData() {
std::clog << __PRETTY_FUNCTION__ << std::endl;
/* Data is processed */
}
TCPSocket socket_;
propertiesPacket properties_{};
processedData filePacket_{};
uint16_t filePacketSize_ = sizeof(filePacket_);
};
class Server
{
public:
using IOContext = ba::io_context;
using TCPAcceptor = tcp::acceptor;
Server(IOContext& io, uint16_t port)
: socket_(io)
, acceptor_(io, {tcp::v4(), port})
{
doAccept();
}
private:
void doAccept()
{
std::clog << __PRETTY_FUNCTION__ << std::endl;
acceptor_.async_accept(socket_, [this](error_code ec) {
if (!ec) {
std::clog << "Accepted " << socket_.remote_endpoint()
<< std::endl;
std::make_shared<ServerConnection>(std::move(socket_))->start();
doAccept();
} else {
std::clog << "Accept " << ec.message() << std::endl;
}
});
}
TCPSocket socket_;
TCPAcceptor acceptor_;
};
// File client.cpp
int main(int argc, char *argv[])
{
ba::io_context io;
Server s{io, 6869};
std::thread server_thread{[&io] {
io.run();
}};
// always check argc!
std::vector<std::string> args(argv, argv + argc);
if (args.size() == 1)
args = {"demo", "127.0.0.1", "6869"};
// avoid race with server accept thread
post(io, [&io, args] {
try {
propertiesPacket properties;
// ...
// We set the data inside the properties struct
// ...
tcp::socket socket(io);
tcp::resolver resolver(io);
connect(socket, resolver.resolve(args.at(1), args.at(2)));
write(socket, ba::buffer(&properties, sizeof(properties)));
uint16_t responseSize{};
ba::read(socket, ba::buffer(&responseSize, sizeof(uint16_t)));
std::clog << "Client responseSize: " << responseSize << std::endl;
processedData response{};
assert(responseSize <= sizeof(response));
ba::read(socket, ba::buffer(&response, responseSize));
// ...
// The client handles the data
// ...
// for online demo:
io.stop();
} catch (std::exception const& e) {
std::clog << e.what() << std::endl;
}
});
io.run_one();
server_thread.join();
}
Printing something similar to
void Server::doAccept()
Server::doAccept()::<lambda(boost::system::error_code)> Success
void ServerConnection::start()
void ServerConnection::doRead()
void Server::doAccept()
received: 1
void ServerConnection::processData()
written: 3
void ServerConnection::doRead()
Client responseSize: 1