Search code examples
c++boostc++20coroutineasio

How to wait multiple CompletionToken using C++20 coroutine with Boost.Asio?


Background

I'm using Boost.Asio. I call multiple asynchronous functions, then I want to wait multiple CompetionToken. For example, async_read() and steady_timer::async_wait().

I implemented it using Boost.Asio's stackless coroutine. It works fine.

There are two timers. The first timer is longer time then the second one. So the second timer is fired earlier, and cancel the first timer. The first timer is an emulation of async_read(), and the second timer is its timeout.

Stackless coroutine approach

#include <iostream>
#include <chrono>
#include <boost/asio.hpp>

namespace as = boost::asio;

#include <boost/asio/yield.hpp>

template <typename Executor>
struct my_app {
    my_app(Executor exe)
        :exe_{std::move(exe)},
         worker_{*this}
    {}

private:
    friend struct worker;
    struct worker {
        worker(my_app& ma)
            :ma_{ma}
        {
            // initiate coroutine
            (*this)(boost::system::error_code{}, std::string{});
        }
        void operator()(
            boost::system::error_code const& ec,
            std::string timer_str // to distinguish timer
        ) const {
            reenter(coro_) {
                std::cout << "start" << std::endl;

                yield {
                    // set two timers
                    // timer1 is a kind of receive emulation 
                    // timer2 is a receive timeout
                    ma_.t1_.expires_after(std::chrono::milliseconds(200));
                    ma_.t1_.async_wait(
                        as::append(
                            *this,
                            "timer1"
                        )
                    );
                    ma_.t2_.expires_after(std::chrono::milliseconds(100));
                    ma_.t2_.async_wait(
                        as::append(
                            *this,
                            "timer2"
                        )
                    );
                }
                if (ec) {
                    std::cout << timer_str << " " << ec.message() << std::endl;
                }
                else {
                    std::cout << timer_str << " fired" << std::endl;
                    if (timer_str == "timer2") {
                        ma_.t1_.cancel();
                    }
                }
            }
        }
        my_app& ma_;
        mutable as::coroutine coro_;
    };

    Executor exe_;
    as::steady_timer t1_{exe_};
    as::steady_timer t2_{exe_};
    worker worker_;
};

#include <boost/asio/unyield.hpp>

int main() {
    as::io_context ioc;
    my_app ma{ioc.get_executor()};
    ioc.run();
}

Demo: https://godbolt.org/z/xM8o4Yzfa

How to implement the same logic using C++20 coroutine ?

I tried to migrate from the stackless coroutine to C++20 coroutine. But I couldn't find a way to wait multiple CompletionToken. How should I do ?

What I tried

use co_await step by step

#include <iostream>
#include <chrono>
#include <boost/asio.hpp>

namespace as = boost::asio;

template <typename Executor>
as::awaitable<void>
proc(Executor exe) {
    as::steady_timer t1{exe, std::chrono::milliseconds{200}};
    as::steady_timer t2{exe, std::chrono::milliseconds{100}};
    // how to multiwait t1 and t2 ?
    co_await t1.async_wait(as::use_awaitable);
    std::cout << "timer fired1" << std::endl;
    co_await t2.async_wait(as::use_awaitable);
    std::cout << "timer fired2" << std::endl;
}

int main() {
    as::io_context ioc;
    as::co_spawn(ioc, proc(ioc.get_executor()), as::detached);
    ioc.run();
}

Demo: https://godbolt.org/z/zjT8W6Wxr

It doesn't work well because after t1 is fired the second timer is set.

use multiple co_spawn

#include <iostream>
#include <chrono>
#include <boost/asio.hpp>

namespace as = boost::asio;

as::awaitable<void>
proc1(as::steady_timer& t1) {
    try {
        co_await t1.async_wait(as::use_awaitable);
        std::cout << "timer1 fired" << std::endl;
    }
    catch (std::runtime_error const& ec) {
        std::cout << "timer1 " << ec.what() << std::endl;
    }
}

as::awaitable<void>
proc2(as::steady_timer& t1, as::steady_timer& t2) {
    try {
        co_await t2.async_wait(as::use_awaitable);
        std::cout << "timer2 fired" << std::endl;
        t1.cancel();
    }
    catch (std::runtime_error const& ec) {
        std::cout << "timer2 " << ec.what() << std::endl;
    }
}

int main() {
    as::io_context ioc;

    as::steady_timer t1{ioc.get_executor(), std::chrono::seconds{2}};
    as::steady_timer t2{ioc.get_executor(), std::chrono::seconds{1}};

    as::co_spawn(ioc, proc1(t1), as::detached);
    as::co_spawn(ioc, proc2(t1, t2), as::detached);

    ioc.run();
}

Demo: https://godbolt.org/z/jba7dsYcx

It is technically possible but still have problems. For example, this approach cannot seems to work in the awaitable function (the function has return value as::awaitable<T>). I guess that there are better ways.

Environment

Boost 1.83.0 x86-64 clang 17.0.1


Solution

  • Asio implements parallel groups, in a similar vein to the blog post commented by Louis:

    Coordinating Parallel Operations

    Though technically "experimental" this functionality is clearly here to stay. The experimental nature indicates that some aspects of its API interface or semantics might still change slightly¹.

    The documentation shows how to work with them:

    The conditions for completion of the group may be specified using one of the four provided function objects wait_for_all, wait_for_one, wait_for_one_success, wait_for_one_error, or with a custom function

    However since you're dealing with awaitables, by far the most convenient way to benefit from parallel operations is by using awaitable_operators:

    asio::awaitable<void> both() {
        auto ex = co_await asio::this_coro::executor;
        asio::steady_timer t1{ex, 200ms};
        asio::steady_timer t2{ex, 100ms};
    
        co_await (t1.async_wait(asio::use_awaitable) && t2.async_wait(asio::use_awaitable));
        std::cout << "both completed" << std::endl;
    }
    

    See it Live On Coliru.

    Note how the executor is intrinsic to the coroutine.

    Things become slightly more interesting with output, variadics, non-exception error propagation etc.:

    Live On Coliru

    using Timer = asio::use_awaitable_t<>::as_default_on_t<asio::steady_timer>;
    using Task  = asio::awaitable<void>;
    
    Task all(auto... duration) {
        auto ex = co_await asio::this_coro::executor;
    
        auto task = [ex](auto duration) -> Task {
            co_await Timer(ex, duration).async_wait();
            std::cout << "completed: " << duration << std::endl;
        };
    
        co_await (task(duration) && ...);
        std::cout << "all completed" << std::endl;
    }
    
    int main() {
        asio::io_context ioc;
        co_spawn(ioc, all(100ms, 200ms, 50ms, 2s), asio::detached);
        ioc.run();
    }
    

    Which you might prefer without the lambda:

    Task task(auto duration) {
        co_await Timer(co_await asio::this_coro::executor, duration).async_wait();
        std::cout << "completed: " << duration << std::endl;
    };
    
    Task all(auto... duration) {
        co_await (task(duration) && ...);
        std::cout << "all completed" << std::endl;
    }
    

    Printing

    completed: 50ms
    completed: 100ms
    completed: 200ms
    completed: 2s
    all completed
    

    Conversely:

    Task any(auto... duration) {
        co_await (task(duration) || ...);
        std::cout << "first completed" << std::endl;
    }
    

    Prints Live On Coliru

    completed: 50ms
    first completed
    

    And getting some results at the same time:

    template <typename Duration> asio::awaitable<Duration> task(Duration dur) {
        co_await Timer(co_await asio::this_coro::executor, dur).async_wait();
        co_return dur;
    };
    
    Task any(auto... dur) {
        using M = std::chrono::milliseconds;
        std::array<M, sizeof...(dur)> args{dur...};
    
        auto r = co_await (... || task(dur));
        std::cout << "Completed: #" << r.index() << "/" << args.size() << ", " << args.at(r.index()) << std::endl;
    
        assert(std::visit<M>(std::identity{}, r) == args.at(r.index()));
    }
    

    Printing Live On Coliru

    Completed: #2/4, 59ms
    

    ¹ I'm not accutely aware of any such changes over the last ~5 release