I am trying boost asio , the test code is based on asio-wrapper .
Is there any way to prevent data concentration without relaying on std::this_thread::sleep_for
in this case?
This is the test code :
#include<iostream>
#include <vector>
#include<thread>
#include<chrono>
#include "asio_wrapper.hpp"
class WritableHandlerImplServer : public cppeng::tcp::WritableHandler {
public:
void HandleCallback(std::vector<uint8_t> data,
std::shared_ptr<cppeng::tcp::Writable> writable) override {
// Convert void pointer to string
std::string received(data.begin(), data.end());
std::cout << "Server Rx: " << received << std::endl;
}
void NotifyClosed(std::shared_ptr<cppeng::tcp::Writable> ptr) override {
std::cout << "Server: Connection closed!" << std::endl;
}
};
class WritableHandlerImplClient : public cppeng::tcp::WritableHandler {
public:
void HandleCallback(std::vector<uint8_t> data,
std::shared_ptr<cppeng::tcp::Writable> writable) override {
// Convert void pointer to string
std::string received(data.begin(), data.end());
std::cout << "Client Rx: " << received << std::endl;
rx_flag_ = true;
}
void NotifyClosed(std::shared_ptr<cppeng::tcp::Writable> ptr) override {
std::cout << "Client: Connection closed!" << std::endl;
}
bool rx_flag_{ false };
};
int main()
{
/*this is a test code*/
try {
WritableHandlerImplServer writable_handler_server;
WritableHandlerImplClient writable_handler_client;
cppeng::tcp::Server server(3000, writable_handler_server);
server.Start();
cppeng::tcp::Client client("127.0.0.1", 3000, writable_handler_client);
client.Start(1000);
client.Write("abcd");
std::this_thread::sleep_for(std::chrono::milliseconds(5));
client.Write("efgh");
std::this_thread::sleep_for(std::chrono::milliseconds(5));
client.Write("ijkl");
std::this_thread::sleep_for(std::chrono::milliseconds(5));
client.Write("mnop");
std::this_thread::sleep_for(std::chrono::milliseconds(5));
client.Write("qrst");
std::this_thread::sleep_for(std::chrono::milliseconds(5));
client.Write("uvwx");
std::this_thread::sleep_for(std::chrono::milliseconds(5));
client.Write("yz");
server.Stop();
client.Stop();
}
catch (std::exception& e) {
std::cout << "exception :" << e.what() << "\n";
}
return EXIT_SUCCESS;
}
The Write/Read functions :
void Connection::DoRead() {
auto self(shared_from_this());
boost::asio::async_read(socket_,
boost::asio::buffer(read_buffer_.data(), read_buffer_.size()),
boost::asio::transfer_at_least(1),
[this, self](std::error_code ec, std::size_t length) {
if (!ec) {
std::vector<uint8_t> rx_data(read_buffer_.begin(), read_buffer_.begin() + length);
writable_handler_.HandleCallback(std::move(rx_data), shared_from_this());
DoRead();
} else {
Close();
}
});
}
void Connection::DoWrite() {
auto self(shared_from_this());
boost::asio::async_write(socket_,
boost::asio::buffer(write_queue_.front().data(),
write_queue_.front().size()),
[this, self](std::error_code ec, std::size_t /*length*/) {
if (!ec) {
write_queue_.pop();
if (!write_queue_.empty()) {
DoWrite();
} else {
write_busy_ = false;
}
} else {
Close();
}
});
}
Update :
The source author implemented the wrapper write function in a such way that the 2nd message is lost if the boost::asio write function is busy writing!
void Connection::Write(const std::string &data) {
// Add the data to the queue
write_queue_.emplace(data.begin(), data.end());
// Start the write process only if we are not already busy writing
if (!write_busy_) {
write_busy_ = true;
DoWrite();
}
else {
}
}
Assuming that by "data concentration" you mean that packets do not represent "messages": that's correct. TCP is a stream protocol. You get a logical stream. The way the stream is transmitted on the wire is inconsequential.
You need to employ an application protocol to know how to parse the stream. Usually there is framing of messages. Popular choices are delimiters (e.g. NUL bytes) or length-prefixed messages.
In your case it looks as if messages might be 4 octets, meaning you could transfer_exactly(4)
. If not, I'd suggest NUL-delimiting (pick a delimiter that does not interfere with your actual message contents, of course).
It looks like asio_wrapper offers no way to achieve that. It will only help you if framing is not important, or you have to do additional buffering and parsing, which makes your handlers complicated.
I'd suggest not using the wrapper:
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using Message = std::string;
using Handler = std::function<void(Message const& msg)>; // or perhaps an interface
struct Connection : std::enable_shared_from_this<Connection> {
Connection(tcp::socket s, Handler cb) : s_(std::move(s)), cb_(std::move(cb)) {}
void Start() { read_loop(); }
void Write(Message msg) {
post(s_.get_executor(),
[this, self = shared_from_this(), msg = std::move(msg)]() mutable { do_write(std::move(msg)); });
}
void Stop() {
post(s_.get_executor(), [this, self = shared_from_this()]() { s_.cancel(); });
}
private:
tcp::socket s_;
Handler cb_;
Message incoming_;
std::deque<Message> outbox_;
void do_write(Message msg) {
if (!msg.ends_with('\n'))
msg += '\n';
outbox_.push_back(std::move(msg));
if (outbox_.size() == 1)
write_loop();
}
void read_loop() {
asio::async_read_until( //
s_, asio::dynamic_buffer(incoming_), "\n",
[this, self = shared_from_this()](error_code ec, size_t n) {
if (!ec) {
if (cb_ && n)
cb_(incoming_.substr(0, n - 1)); // exclude '\n'
incoming_.erase(0, n);
read_loop();
}
});
}
void write_loop() {
if (outbox_.empty())
return;
asio::async_write( //
s_, asio::buffer(outbox_.front()), [this, self = shared_from_this()](error_code ec, size_t) {
if (!ec) {
outbox_.pop_front();
write_loop();
}
});
}
};
struct Server {
Server(uint16_t port, Handler cb) : acc_(io_, {{}, port}), callback_(std::move(cb)) {}
void Start() {
acc_.listen();
accept_loop();
}
void Stop() {
io_.stop();
// acc_.cancel();
// TODO perhaps keep a list of sessions and stop them?
// io_.join();
}
private:
asio::thread_pool io_{1};
tcp::acceptor acc_;
Handler callback_;
void accept_loop() {
acc_.async_accept(make_strand(acc_.get_executor()), [this](error_code ec, tcp::socket s) {
if (!ec) {
std::make_shared<Connection>(std::move(s), callback_)->Start();
accept_loop();
}
});
}
};
struct Client {
Client(std::string host, uint16_t port, Handler cb)
: spec_(host, std::to_string(port))
, callback_(std::move(cb)) {}
void Start() {
assert(!conn_);
tcp::socket socket_(io_);
asio::connect(socket_, tcp::resolver(io_).resolve(spec_));
conn_ = std::make_shared<Connection>(std::move(socket_), callback_);
conn_->Start();
}
void Write(Message msg) const {
assert(conn_);
conn_->Write(std::move(msg));
}
void Stop() {
if (conn_)
conn_->Stop();
}
private:
asio::thread_pool io_{1};
tcp::resolver::query spec_;
Handler callback_;
std::shared_ptr<Connection> conn_;
};
int main() {
try {
Server server(3000, [](std::string const& msg) { std::cout << "Server Rx: " << msg << std::endl; });
server.Start();
Client client("127.0.0.1", 3000,
[](std::string const& msg) { std::cout << "Client Rx: " << msg << std::endl; });
client.Start();
client.Write("abcd");
client.Write("efgh");
client.Write("ijkl");
client.Write("mnop");
client.Write("qrst");
client.Write("uvwx");
client.Write("yz");
client.Stop();
std::this_thread::sleep_for(5ms);
server.Stop(); // don't stop the server before last message arrived
} catch (std::exception const& e) {
std::cout << "exception :" << e.what() << "\n";
}
}
Printing
Server Rx: abcd
Server Rx: efgh
Server Rx: ijkl
Server Rx: mnop
Server Rx: qrst
Server Rx: uvwx
Server Rx: yz