Do boost::asio c++20 coroutines support multithreading?
The boost::asio documentation examples are all single-threaded, are there any multithreaded examples?
Yes.
In Asio, if multiple threads run execution context, you don't normally even control which thread resumes your coroutine.
You can look at some of these answers that ask about how to switch executors mid-stream (controlling which strand or execution context may resume the coro):
Update to the comment:
To make the c++20 coro echo server sample multi-threading you could change 2 lines:
boost::asio::io_context io_context(1);
// ...
io_context.run();
Into
boost::asio::thread_pool io_context;
// ...
io_context.join();
Since each coro is an implicit (or logical) strand, nothing else is needed. Notes:
asio::io_context(1)
with the concurrency hint, because it can avoid synchronization overhead.#include <boost/asio.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <list>
namespace asio = boost::asio;
namespace this_coro = asio::this_coro;
using boost::system::error_code;
using asio::ip::tcp;
using asio::detached;
using executor_type = asio::any_io_executor;
using socket_type = asio::use_awaitable_t<>::as_default_on_t<tcp::socket>; // or tcp::socket
//
using session_state = std::shared_ptr<socket_type>; // or any additional state
using handle = std::weak_ptr<session_state::element_type>;
using namespace std::string_view_literals;
using namespace asio::experimental::awaitable_operators;
asio::awaitable<void> echo_session(session_state s) {
try {
for (std::array<char, 1024> data;;) {
size_t n = co_await s->async_read_some(asio::buffer(data));
co_await async_write(*s, asio::buffer(data, n));
}
} catch (boost::system::system_error const& se) {
if (se.code() != asio::error::operation_aborted) // expecting cancellation
throw;
} catch (std::exception const& e) {
std::cout << "echo Exception: " << e.what() << std::endl;
co_return;
}
error_code ec;
co_await async_write(*s, asio::buffer("Server is shutting down\n"sv),
redirect_error(asio::use_awaitable, ec));
// std::cout << "echo shutdown: " << ec.message() << std::endl;
}
asio::awaitable<void> listener(std::list<handle>& sessions) {
auto ex = co_await this_coro::executor;
for (tcp::acceptor acceptor(ex, {tcp::v4(), 55555});;) {
session_state s = std::make_shared<socket_type>(
co_await acceptor.async_accept(make_strand(ex), asio::use_awaitable));
sessions.remove_if(std::mem_fn(&handle::expired)); // "garbage collect", optional
sessions.emplace_back(s);
co_spawn(ex, echo_session(s), detached);
}
}
int main() {
std::list<handle> handles;
asio::thread_pool io_context;
asio::signal_set signals(io_context, SIGINT, SIGTERM);
auto handler = [&handles](std::exception_ptr ep, auto result) {
try {
if (ep)
std::rethrow_exception(ep);
int signal = get<1>(result);
std::cout << "Signal: " << ::strsignal(signal) << std::endl;
for (auto h : handles)
if (auto s = h.lock()) {
// more logic could be implemented via members on a session_state struct
std::cout << "Shutting down live session " << s->remote_endpoint() << std::endl;
post(s->get_executor(), [s] { s->cancel(); });
}
} catch (std::exception const& e) {
std::cout << "Server: " << e.what() << std::endl;
}
};
co_spawn(io_context, listener(handles) || signals.async_wait(asio::use_awaitable), handler);
io_context.join();
}
Online demo, and local demo: