I've got a thread that listen to socket, read incoming packets, process them and send them to another socket.
I'd like to accelerate this flow, by adding dedicated thread for packet processing without changing the ordering of the packets.
I've defined a thread that run the io_context of all socket activities :
std::thread th([&] {
log_info("reading packets from sockets thread");
boost::asio::steady_timer timer(read_ioc_);
timer.expires_after(std::chrono::hours(1));
timer.async_wait(
[]([[maybe_unused]] const boost::system::error_code& error) {
log("got timeout");
});
read_ioc_.run();
log_info("thread stopped");
});
std::thread th2([&] {
log_info("processing packets");
boost::asio::steady_timer timer(process_ioc_);
timer.expires_after(std::chrono::hours(1));
timer.async_wait(
[]([[maybe_unused]] const boost::system::error_code& error) {
log("got timeout");
});
process_ioc_.run();
log_info("processing stopped");
});
th.detach();
th2.detach();
in the class that handle both packet reading and packet processing. I'll get the 2 io context objects, and create a socket and a strand out of those. the socket will be used to read received packets continuously or waiting for new packet to reach the socket. the ioc_process will be used to process the packets in order (let's say it's encrypting each packet and add additional header like some tunnel protocols)
myClass::myClass(boost::asio::io_context &ioc_process, boost::asio::io_context &ioc_process
) : socket_(read_ioc), strand_(process_ioc) ...
myClass::startReadPacketLoop() {
...
readPacketLoop();
...
}
This is the loop where I enqueue the packet I just read, and go for the next read.
My question is whether I can maintain the packet ordering but have multiple threads to handle the packets simultaneously. can I implement it using shared queue or replace the strand with ioc_process? how will I preserve ordering in this case (let's say I write the packets to another socket and I want them to be in order)
void myClass::readPacketLoop(std::shared_ptr<Packet> pkt,
const boost::system::error_code& error,
size_t bytes_transferred) {
std::shared_ptr<Packet> pkt = freeList.alloc();
socket_.async_receive(boost::asio::buffer(pkt->buffer(), pkt->buffer_size()),
std::bind(&myClass::readPacketLoop);
boost::asio::post(strand_, [=] {
process_packet(pkt, error, bytes_transferred);
});
I'll walk through the issues I see in the order I encountered them reading your question
It looks as if your threads exist for the sole reason to time 1 hour. The whole purpose of Asio is to allow asynchronous operations (such as timers) without the need for threads. Also, since you will have other asynchronous work pending, the timer's completions will have no effect at all.
Assuming you really meant to stop processing after a timeout, simplify:
struct myClass {
myClass(asio::io_context& read_ioc, asio::thread_pool& process_ioc)
: socket_(read_ioc)
, strand_(process_ioc.get_executor()) {
}
void startReadPacketLoop() {
log_info("reading packets from sockets thread");
//...
if (!ec)
startReadPacketLoop();
//...
}
private:
tcp::socket socket_;
asio::strand<asio::thread_pool::executor_type> strand_;
};
int main() {
asio::thread_pool process_ioc_(10);
asio::io_context read_ioc_;
myClass mc(read_ioc_, process_ioc_);
mc.startReadPacketLoop();
read_ioc_.run_for(1h);
}
Firstly, as you probably understand the code
boost::asio::post(strand_, [=] {
process_packet(pkt, error, bytes_transferred);
});
guarantees that all process_packet
will be executed single-threaded - the strand executor makes sure that no two tasks execute concurrently. That is exactly not what you want.
Q. My question is whether I can maintain the packet ordering but have multiple threads to handle the packets simultaneously.
Yes. Just keep in mind the order in which processing results become available may not be input order.
Q. can I implement it using shared queue
Yes. This is the typical application of a FIFO queue. Note that an execution context is a kind of task queue, so you're right to think that maybe you don't need anything non-Asio.
Q. or replace the strand with ioc_process? how will I preserve ordering in this case (let's say I write the packets to another socket and I want them to be in order)
If you logically want to write each packet to the output after processing, you can do exactly that (in synchronous pseudo code, for readability):
while (true) {
packet request = receive_packet();
future<packet> response = process(request); // on the thread pool
post(io_strand, [f=std::move(response)] {
send_packet(f.get()); // may block until response ready
});
}
Note that here you unconditionally post the output operations in sequential order to a strand. Of course, you may have to wait for the next output packet to become ready.
This is known as HOL Blocking
A big point of concern is whenever (potentially) blocking operations (like future::wait/get
) are performed on an execution service it may lead to deadlocks.
You may want to switch your design to passing executors rather than references to specific types of execution contexts or strands thereof. It decouples your code better. I will show this approach in the demo.
Let's create a very simple implementation that
#include <boost/asio.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::literals;
using namespace std::placeholders;
namespace /*anon*/ {
static std::mutex mx;
auto now = std::chrono::steady_clock::now;
static auto const start = now();
static inline void log_info(auto const&... args) {
static std::atomic_uint tid_gen{};
thread_local unsigned const tid = tid_gen++;
std::lock_guard lk(mx); // avoid interleaved console output
std::cout << std::setw(10) << (now() - start) / 1ms << "ms tid=" << tid << " ";
(std::cout << ... << args) << std::endl;
}
static void check(error_code const& ec, [[maybe_unused]] std::string_view where) {
log_info(where, ": ", ec.message());
if (ec.failed() && ec != asio::error::eof)
throw boost::system::system_error(ec);
}
} // namespace
struct myClass {
myClass(asio::any_io_executor ex) : socket_(ex) {}
void startReadPacketLoop(uint16_t port) {
socket_.connect({{}, port});
log_info("reading packets from sockets thread");
post(socket_.get_executor(), [this] { do_receive(); });
}
private:
std::string incoming_;
void do_receive() { // continue receiving packets
async_read_until(socket_, asio::dynamic_buffer(incoming_), "\n",
std::bind(&myClass::on_received, this, _1, _2));
}
void on_received(error_code ec, size_t len) {
check(ec, "on_received");
if (ec == asio::error::eof)
return;
// post processing task on free pool
std::packaged_task<std::string()> task{std::bind(process_packet, incoming_.substr(0, len))};
std::future<std::string> fut = post(pool_, std::move(task));
// but post output on the strand
post(output_strand_, [this, fut = std::move(fut)]() mutable {
// note fut.get() may block
auto response = std::make_shared<std::string>(fut.get() + "\n");
async_write(socket_, asio::buffer(*response), [response](error_code ec, size_t /*n*/) {
check(ec, "async_write");
// log_info("written (", ec.message(), ") ", n, " of ", response->length(), " tranferred");
});
});
// consume packet from incoming
incoming_.erase(0, len);
// continue receiving packets
do_receive();
};
static std::string process_packet(std::string request) {
request.resize(1 + request.find_last_not_of("\r\n"));
unsigned cksum = 0;
for (uint8_t ch : request)
cksum ^= ch;
std::this_thread::sleep_for(1ms * cksum);
log_info("slept for ", cksum, "ms after processing ", quoted(request));
return request + ": " + std::to_string(cksum);
}
tcp::socket socket_;
asio::thread_pool pool_{10};
asio::strand<asio::thread_pool::executor_type> output_strand_{pool_.get_executor()};
};
int main() try {
log_info("start");
asio::io_context ioc;
myClass mc(ioc.get_executor());
mc.startReadPacketLoop(8989);
ioc.run_for(10s);
log_info("done");
} catch (boost::system::system_error const& se) {
log_info("error: ", se.code().message());
}
With a simple server
printf "qux\nbar\nfoo\n" | nc -t -l -p 8989 -w1&
Prints
0ms tid=0 start
0ms tid=0 reading packets from sockets thread
0ms tid=0 on_received: Success
0ms tid=0 on_received: Success
0ms tid=0 on_received: Success
0ms tid=0 on_received: End of file
103ms tid=0 slept for 102ms after processing "foo"
114ms tid=1 slept for 113ms after processing "bar"
124ms tid=2 slept for 124ms after processing "qux"
qux: 124
bar: 113
125ms tid=0 async_write: Success
foo: 102
125ms tid=0 async_write: Success
125ms tid=0 async_write: Success
10002ms tid=0 done
Note that the output order matches input order, even though processing completes in inverse order due to the delays.
However, there are two problems.
If input arrives too fast, output stops¹:
As stated before, future::get()
blocks. However, that's not the real issue, since the strand ensures only one of the pool's threads will ever block on a future at a time. What's happening is starvation.
Of course we can "fix" it by making the pool_
arbitrarily large. E.g. with 1'000 threads, it does complete:
asio::thread_pool pool_{1'000};
Even so, it is clear that the output_strand_
is sometimes starved for CPU time. To really alleviate the situation, instead we can stop competing for pool_
threads at all, replacing the output_strand_
with a single thread:
tcp::socket socket_;
asio::thread_pool pool_{10}, output_{1};
Indeed, now we don't see starvation anymore.
Since output is done on a different thread than other accesses to socket_
we have a data race. To fix it, we will need to add indirection:
// but post output from the strand
post(output_, [this, fut = std::move(fut)]() mutable {
// note fut.get() may block
auto response = std::make_shared<std::string>(fut.get() + "\n");
post(socket_.get_executor(), [=] {
async_write( //
socket_, asio::buffer(*response), //
[response](error_code ec, [[maybe_unused]] size_t n) {
check(ec, "async_write");
// log_info("written (", ec.message(), ") ", n, " of ", response->length(), " tranferred");
});
});
});
Note how the blocking wait is (importantly) not on the IO thread, but the async_write
is.
With this in place, you may have to add a work-guard, because if the input end closes the file sooner than the processing is ready, there would not be any outstanding work keeping the ioc
alive.
post(output_, [this, fut = std::move(fut), work = make_work_guard(socket_.get_executor())]() mutable {
#include <boost/asio.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::literals;
using namespace std::placeholders;
namespace /*anon*/ {
static std::mutex mx;
auto now = std::chrono::steady_clock::now;
static auto const start = now();
static inline void log_info(auto const&... args) {
static std::atomic_uint tid_gen{};
thread_local unsigned const tid = tid_gen++;
std::lock_guard lk(mx); // avoid interleaved console output
std::cout << std::setw(10) << (now() - start) / 1ms << "ms tid=" << tid << " ";
(std::cout << ... << args) << std::endl;
}
static void check(error_code const& ec, [[maybe_unused]] std::string_view where) {
// log_info(where, ": ", ec.message());
if (ec.failed() && ec != asio::error::eof)
throw boost::system::system_error(ec);
}
} // namespace
struct myClass {
myClass(asio::any_io_executor ex) : socket_(ex) {}
void startReadPacketLoop(uint16_t port) {
socket_.connect({{}, port});
log_info("reading packets from sockets thread");
post(socket_.get_executor(), [this] { do_receive(); });
}
private:
std::string incoming_;
void do_receive() { // continue receiving packets
async_read_until(socket_, asio::dynamic_buffer(incoming_), "\n",
std::bind(&myClass::on_received, this, _1, _2));
}
void on_received(error_code ec, size_t len) {
check(ec, "on_received");
if (ec == asio::error::eof)
return;
// post processing task on free pool
std::packaged_task<std::string()> task{std::bind(process_packet, incoming_.substr(0, len))};
std::future<std::string> fut = post(pool_, std::move(task));
// but post output from the strand
post(output_, [this, fut = std::move(fut), work = make_work_guard(socket_.get_executor())]() mutable {
// note fut.get() may block
auto response = std::make_shared<std::string>(fut.get() + "\n");
post(socket_.get_executor(), [this, response] {
async_write( //
socket_, asio::buffer(*response), //
[response](error_code ec, [[maybe_unused]] size_t n) {
check(ec, "async_write");
// log_info("written (", ec.message(), ") ", n, " of ", response->length(), " tranferred");
});
});
});
// consume packet from incoming
incoming_.erase(0, len);
// continue receiving packets
do_receive();
};
static std::string process_packet(std::string request) {
request.resize(1 + request.find_last_not_of("\r\n"));
unsigned cksum = 0;
for (uint8_t ch : request)
cksum ^= ch;
std::this_thread::sleep_for(1ms * cksum);
log_info("slept for ", cksum, "ms after processing ", quoted(request));
return request + ": " + std::to_string(cksum);
}
tcp::socket socket_;
asio::thread_pool pool_{10}, output_{1};
};
int main() try {
log_info("start");
asio::io_context ioc;
myClass mc(ioc.get_executor());
mc.startReadPacketLoop(8989);
ioc.run_for(1h);
log_info("done");
} catch (boost::system::system_error const& se) {
log_info("error: ", se.code().message());
}
This approach isn't optimal. It allocates unncessarily, uses blocking operations and so on.
You could optimize by using a ringbuffer/disruptor style container for the output queue. You'll need to use a monotonously increasing id for processed packets. Bonus, if you make sure the container has suitable reference stability, you can avoid dynamically allocating the shared responses.
¹ input.txt generated with sort -R /etc/dictionaries-common/words | grep -v \' > input.txt