Search code examples
c++boost-asioc++20c++-coroutine

Do boost::asio c++20 coroutines support multithreading?


Do boost::asio c++20 coroutines support multithreading?

The boost::asio documentation examples are all single-threaded, are there any multithreaded examples?


Solution

  • Yes.

    In Asio, if multiple threads run execution context, you don't normally even control which thread resumes your coroutine.

    You can look at some of these answers that ask about how to switch executors mid-stream (controlling which strand or execution context may resume the coro):


    Update to the comment:

    To make the c++20 coro echo server sample multi-threading you could change 2 lines:

    boost::asio::io_context io_context(1);
    // ...
    io_context.run();
    

    Into

    boost::asio::thread_pool io_context;
    // ...
    io_context.join();
    

    Since each coro is an implicit (or logical) strand, nothing else is needed. Notes:

    • Doing this is likely useless, unless you're doing significant work inside the coroutines, that would slow down IO multiplexing on a single thread.
    • In practice a single thread can easily handle 10k concurrent connections, especially with C++20 coroutines.
    • Note that it can be a significant performance gain to run the asio::io_context(1) with the concurrency hint, because it can avoid synchronization overhead.
    • When you introduce e.g. asynchronous session control or full-duplex you will have the need for an explicit strand. In the below example I show how you would make each "session" use a strand, and e.g. do graceful shutdown.

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/co_spawn.hpp>
    #include <boost/asio/experimental/awaitable_operators.hpp>
    #include <iostream>
    #include <list>
    
    namespace asio = boost::asio;
    namespace this_coro = asio::this_coro;
    using boost::system::error_code;
    using asio::ip::tcp;
    using asio::detached;
    using executor_type = asio::any_io_executor;
    using socket_type   = asio::use_awaitable_t<>::as_default_on_t<tcp::socket>; // or tcp::socket
                                                                                 //
    using session_state = std::shared_ptr<socket_type>;                          // or any additional state
    using handle        = std::weak_ptr<session_state::element_type>;
    
    using namespace std::string_view_literals;
    using namespace asio::experimental::awaitable_operators;
    
    asio::awaitable<void> echo_session(session_state s) {
        try {
            for (std::array<char, 1024> data;;) {
                size_t n = co_await s->async_read_some(asio::buffer(data));
                co_await async_write(*s, asio::buffer(data, n));
            }
        } catch (boost::system::system_error const& se) {
            if (se.code() != asio::error::operation_aborted) // expecting cancellation
                throw;
        } catch (std::exception const& e) {
            std::cout << "echo Exception: " << e.what() << std::endl;
            co_return;
        }
    
        error_code ec;
        co_await async_write(*s, asio::buffer("Server is shutting down\n"sv),
                             redirect_error(asio::use_awaitable, ec));
    
        // std::cout << "echo shutdown: " << ec.message() << std::endl;
    }
    
    asio::awaitable<void> listener(std::list<handle>& sessions) {
        auto ex = co_await this_coro::executor;
    
        for (tcp::acceptor acceptor(ex, {tcp::v4(), 55555});;) {
            session_state s = std::make_shared<socket_type>(
                co_await acceptor.async_accept(make_strand(ex), asio::use_awaitable));
    
            sessions.remove_if(std::mem_fn(&handle::expired)); // "garbage collect", optional
            sessions.emplace_back(s);
    
            co_spawn(ex, echo_session(s), detached);
        }
    }
    
    int main() {
        std::list<handle> handles;
    
        asio::thread_pool io_context;
        asio::signal_set signals(io_context, SIGINT, SIGTERM);
    
        auto handler = [&handles](std::exception_ptr ep, auto result) {
            try {
                if (ep)
                    std::rethrow_exception(ep);
    
                int signal = get<1>(result);
                std::cout << "Signal: " << ::strsignal(signal) << std::endl;
                for (auto h : handles)
                    if (auto s = h.lock()) {
                        // more logic could be implemented via members on a session_state struct
                        std::cout << "Shutting down live session " << s->remote_endpoint() << std::endl;
                        post(s->get_executor(), [s] { s->cancel(); });
                    }
            } catch (std::exception const& e) {
                std::cout << "Server: " << e.what() << std::endl;
            }
        };
    
        co_spawn(io_context, listener(handles) || signals.async_wait(asio::use_awaitable), handler);
    
        io_context.join();
    }
    

    Online demo, and local demo:

    enter image description here