boost::asio::detail::cancellation_handler_base::call[virtual] == 0xFFFFFFFFFFFFFFFF。
/// Emits the signal and causes invocation of the slot's handler, if any.
void emit(cancellation_type_t type)
{
if (handler_)
handler_->call(type);
}
I can't find where the problem is anymore, the exception cannot be caught
ServerDome:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <cstdio>
using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::steady_timer;
using boost::asio::use_awaitable;
using boost::asio::thread_pool;
using std::chrono::steady_clock;
using boost::asio::as_tuple;
using boost::asio::buffer;
using boost::asio::experimental::channel;
using boost::asio::io_context;
using boost::asio::steady_timer;
namespace this_coro = boost::asio::this_coro;
using ssl_socket = boost::asio::ssl::stream<boost::asio::ip::tcp::socket>;
constexpr auto use_nothrow_awaitable = boost::asio::experimental::as_tuple(boost::asio::use_awaitable);
using namespace std::literals::chrono_literals;
using namespace boost::asio::experimental::awaitable_operators;
awaitable<void> echo(tcp::socket socket)
{
steady_timer timer(co_await this_coro::executor);
try
{
char data[2000];
for (;;)
{
timer.expires_after(2ms);
auto result1 = co_await(async_read(socket, buffer(data, 2000), use_nothrow_awaitable) || timer.async_wait(use_nothrow_awaitable));
if (result1.index() == 1) {
//std::cout << "time out." << std::endl;
}
else {
auto [e,n] = std::get<0>(result1);
if (!e)
{
if (n)
co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
}
else {
std::cout << e.message() << std::endl;
}
}
}
}
catch (std::exception& e)
{
std::printf("echo Exception: %s\n", e.what());
}
}
awaitable<void> listener()
{
auto executor = co_await this_coro::executor;
tcp::acceptor acceptor(executor, { tcp::v4(), 5555 });
for (;;)
{
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(executor, echo(std::move(socket)), detached);
}
}
int main(int argc, char* argv[])
{
try
{
thread_pool pol(8);
boost::asio::signal_set signals(pol, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto) { pol.stop(); });
co_spawn(pol, listener(), detached);
pol.wait();
}
catch (std::exception& e)
{
std::printf("Exception: %s\n", e.what());
}
}
ClientDome:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <cstdio>
using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::steady_timer;
using boost::asio::use_awaitable;
using std::chrono::steady_clock;
using boost::asio::thread_pool;
using boost::asio::as_tuple;
using boost::asio::buffer;
using boost::asio::experimental::channel;
using boost::asio::io_context;
namespace this_coro = boost::asio::this_coro;
constexpr auto use_nothrow_awaitable = boost::asio::experimental::as_tuple(boost::asio::use_awaitable);
using namespace boost::asio::experimental::awaitable_operators;
using namespace std::literals::chrono_literals;
thread_pool io_context(3);
awaitable<void> timeout(steady_clock::duration duration)
{
steady_timer timer(co_await this_coro::executor);
timer.expires_after(duration);
co_await timer.async_wait(use_nothrow_awaitable);
}
awaitable<void> echo3(tcp::socket socket)
{
try
{
char data[4024];
for (;;)
{
co_await timeout(std::chrono::milliseconds(2));
co_await async_write(socket, boost::asio::buffer(data, 2000), use_awaitable);
}
}
catch (const std::exception& e)
{
std::printf("echo2 Exception: %s\n", e.what());
}
}
awaitable<void> listener(tcp::socket socket)
{
try
{
auto executor = co_await this_coro::executor;
auto listen_endpoint =
*tcp::resolver(socket.get_executor()).resolve("127.0.0.1", std::to_string(5555),
tcp::resolver::passive).begin();
co_await socket.async_connect(listen_endpoint, use_awaitable);
co_spawn(executor, echo3(std::move(socket)), detached);
}
catch (const std::exception& e)
{
std::printf("connect Exception: %s\n", e.what());
}
}
int main()
{
try
{
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto) { io_context.stop(); });
for (size_t i = 0; i < 3; i++)
{
co_spawn(io_context, listener(std::move(tcp::socket(io_context))), detached);
}
io_context.wait();
}
catch (std::exception& e)
{
std::printf("main Exception: %s\n", e.what());
}
}
I expect the program to run without errors and exceptions
There are a two main things:
I noticed sparsely occuring SEGV in deadline_timer_service
. I noticed that you have a multi-threaded server context,but didn't guard the connections (echo
sessions with strand). To be completely honest, I didn't intuitively think you would require one, but I'm thinking that the parallel_group
that implements the ||
awaitable operator uses a shared cancellation mechanism that may not be threadsafe. In that case, a strand might be required:
asio::awaitable<void> listener() {
for (tcp::acceptor acc(co_await this_coro::executor, {{}, 5555});;) {
auto s = make_strand(acc.get_executor());
co_spawn( //
s, //
echo(co_await acc.async_accept(s, use_awaitable)), //
asio::detached);
}
}
With that the SEGV goes away.
I noticed client and server stop progressing after a while. After counting lines I noticed it would freeze up after the same amount of messages transferred each time: ~7800x2000 bytes. That's about 5GiB per socket.
I figured that the kernel might just block writes when the read buffer is at maximum size, because the client never reads the echoed packets in the client.
So, I added that. The neat thing to do would be to co_spawn
a full-duplex coroutine, but since we know that all packets are echoed 1:1, I did the simple thing and put the read inline with the write loop.
Effectively we replace the sleep(2ms)
with a "read indefinitely" (here it's important that the data
buffer has a higher capacity than the maximum expected reply) limited to 2ms:
co_await (async_read(socket, asio::buffer(data), use_awaitable) || timeout(2ms));
Side note: now that we used parallel groups in the client code as well, I made sure to upgrade to strand executors for the coro as well.
With that change, all is well, and the server/client kept running without failures, tripping UBSan/ASan or deadlocking.
This program (potentially) combines client and server, time limited to 30s only for COLIRU. It logs frequent messages once-in-a-thousand times:
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <set>
namespace asio = boost::asio;
using asio::use_awaitable;
using asio::ip::tcp;
namespace this_coro = asio::this_coro;
constexpr auto nothrow_await = asio::as_tuple(use_awaitable);
using namespace asio::experimental::awaitable_operators;
using namespace std::literals::chrono_literals;
void once_in(size_t n, auto&& action) { // helper for reduced frequency logging
static std::atomic_size_t counter_ = 0;
if ((++counter_ % n) == 0) {
if constexpr (std::is_invocable_v<decltype(action), size_t>)
std::move(action)(counter_);
else
std::move(action)();
}
}
asio::awaitable<void> timeout(auto duration) {
asio::steady_timer timer(co_await this_coro::executor);
timer.expires_after(duration);
co_await timer.async_wait(nothrow_await);
}
asio::awaitable<void> server_session(tcp::socket socket) {
try {
for (std::array<char, 2000> data;;) {
if (auto r = co_await (async_read(socket, asio::buffer(data), nothrow_await) || timeout(2ms));
r.index() == 1) {
once_in(1000, [&] { std::cout << "server_session time out." << std::endl; });
} else {
auto [e, n] = std::get<0>(r);
if (!e) {
once_in(1000, [&, n = n] {
std::cout << "server_session writing " << n << " bytes to "
<< socket.remote_endpoint() << std::endl;
});
if (n)
co_await async_write(socket, asio::buffer(data, n), use_awaitable);
} else {
std::cout << e.message() << std::endl;
}
}
}
} catch (boost::system::system_error const& se) {
std::cout << "server_session Exception: " << se.code().message() << std::endl;
} catch (std::exception const& e) {
std::cout << "server_session Exception: " << e.what() << std::endl;
}
std::cout << "server_session closed" << std::endl;
}
asio::awaitable<void> listener(uint16_t port) {
for (tcp::acceptor acc(co_await this_coro::executor, {{}, port});;) {
auto s = make_strand(acc.get_executor());
co_spawn( //
s, //
server_session(co_await acc.async_accept(s, use_awaitable)), //
asio::detached);
}
}
asio::awaitable<void> client_session(uint16_t port) {
try {
tcp::socket socket(co_await this_coro::executor);
co_await socket.async_connect({{}, port}, use_awaitable);
for (std::array<char, 4024> data{0};;) {
co_await (async_read(socket, asio::buffer(data), use_awaitable) || timeout(2ms));
auto w = co_await async_write(socket, asio::buffer(data, 2000 /*SEHE?!*/), use_awaitable);
once_in(1000, [&](size_t counter) {
std::cout << "#" << counter << " wrote " << w << " bytes from " << socket.local_endpoint()
<< std::endl;
});
}
} catch (boost::system::system_error const& se) {
std::cout << "client_session Exception: " << se.code().message() << std::endl;
} catch (std::exception const& e) {
std::cout << "client_session Exception: " << e.what() << std::endl;
}
std::cout << "client_session closed" << std::endl;
}
int main(int argc, char** argv) {
auto flags = std::set<std::string_view>(argv + 1, argv + argc);
bool server = flags.contains("server");
bool client = flags.contains("client");
asio::thread_pool pool(server ? 8 : 3);
try {
asio::signal_set signals(pool, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto) { pool.stop(); });
if (server) {
co_spawn(pool, listener(5555), asio::detached);
}
if (client) {
co_spawn(make_strand(pool), client_session(5555), asio::detached);
co_spawn(make_strand(pool), client_session(5555), asio::detached);
co_spawn(make_strand(pool), client_session(5555), asio::detached);
}
std::this_thread::sleep_for(30s); // time limited for COLIRU
} catch (std::exception const& e) {
std::cout << "main Exception: " << e.what() << std::endl;
}
}
Local demo for clarity:
² also using experimental::as_tuple
for COLIRU