I'm using Boost.Asio. I call multiple asynchronous functions, then I want to wait multiple CompetionToken. For example, async_read()
and steady_timer::async_wait()
.
I implemented it using Boost.Asio's stackless coroutine. It works fine.
There are two timers. The first timer is longer time then the second one. So the second timer is fired earlier, and cancel the first timer.
The first timer is an emulation of async_read()
, and the second timer is its timeout.
#include <iostream>
#include <chrono>
#include <boost/asio.hpp>
namespace as = boost::asio;
#include <boost/asio/yield.hpp>
template <typename Executor>
struct my_app {
my_app(Executor exe)
:exe_{std::move(exe)},
worker_{*this}
{}
private:
friend struct worker;
struct worker {
worker(my_app& ma)
:ma_{ma}
{
// initiate coroutine
(*this)(boost::system::error_code{}, std::string{});
}
void operator()(
boost::system::error_code const& ec,
std::string timer_str // to distinguish timer
) const {
reenter(coro_) {
std::cout << "start" << std::endl;
yield {
// set two timers
// timer1 is a kind of receive emulation
// timer2 is a receive timeout
ma_.t1_.expires_after(std::chrono::milliseconds(200));
ma_.t1_.async_wait(
as::append(
*this,
"timer1"
)
);
ma_.t2_.expires_after(std::chrono::milliseconds(100));
ma_.t2_.async_wait(
as::append(
*this,
"timer2"
)
);
}
if (ec) {
std::cout << timer_str << " " << ec.message() << std::endl;
}
else {
std::cout << timer_str << " fired" << std::endl;
if (timer_str == "timer2") {
ma_.t1_.cancel();
}
}
}
}
my_app& ma_;
mutable as::coroutine coro_;
};
Executor exe_;
as::steady_timer t1_{exe_};
as::steady_timer t2_{exe_};
worker worker_;
};
#include <boost/asio/unyield.hpp>
int main() {
as::io_context ioc;
my_app ma{ioc.get_executor()};
ioc.run();
}
Demo: https://godbolt.org/z/xM8o4Yzfa
I tried to migrate from the stackless coroutine to C++20 coroutine. But I couldn't find a way to wait multiple CompletionToken. How should I do ?
#include <iostream>
#include <chrono>
#include <boost/asio.hpp>
namespace as = boost::asio;
template <typename Executor>
as::awaitable<void>
proc(Executor exe) {
as::steady_timer t1{exe, std::chrono::milliseconds{200}};
as::steady_timer t2{exe, std::chrono::milliseconds{100}};
// how to multiwait t1 and t2 ?
co_await t1.async_wait(as::use_awaitable);
std::cout << "timer fired1" << std::endl;
co_await t2.async_wait(as::use_awaitable);
std::cout << "timer fired2" << std::endl;
}
int main() {
as::io_context ioc;
as::co_spawn(ioc, proc(ioc.get_executor()), as::detached);
ioc.run();
}
Demo: https://godbolt.org/z/zjT8W6Wxr
It doesn't work well because after t1 is fired the second timer is set.
#include <iostream>
#include <chrono>
#include <boost/asio.hpp>
namespace as = boost::asio;
as::awaitable<void>
proc1(as::steady_timer& t1) {
try {
co_await t1.async_wait(as::use_awaitable);
std::cout << "timer1 fired" << std::endl;
}
catch (std::runtime_error const& ec) {
std::cout << "timer1 " << ec.what() << std::endl;
}
}
as::awaitable<void>
proc2(as::steady_timer& t1, as::steady_timer& t2) {
try {
co_await t2.async_wait(as::use_awaitable);
std::cout << "timer2 fired" << std::endl;
t1.cancel();
}
catch (std::runtime_error const& ec) {
std::cout << "timer2 " << ec.what() << std::endl;
}
}
int main() {
as::io_context ioc;
as::steady_timer t1{ioc.get_executor(), std::chrono::seconds{2}};
as::steady_timer t2{ioc.get_executor(), std::chrono::seconds{1}};
as::co_spawn(ioc, proc1(t1), as::detached);
as::co_spawn(ioc, proc2(t1, t2), as::detached);
ioc.run();
}
Demo: https://godbolt.org/z/jba7dsYcx
It is technically possible but still have problems. For example, this approach cannot seems to work in the awaitable function (the function has return value as::awaitable<T>
).
I guess that there are better ways.
Boost 1.83.0 x86-64 clang 17.0.1
Asio implements parallel groups, in a similar vein to the blog post commented by Louis:
Coordinating Parallel Operations
Though technically "experimental" this functionality is clearly here to stay. The experimental
nature indicates that some aspects of its API interface or semantics might still change slightly¹.
The documentation shows how to work with them:
The conditions for completion of the group may be specified using one of the four provided function objects
wait_for_all
,wait_for_one
,wait_for_one_success
,wait_for_one_error
, or with a custom function
However since you're dealing with awaitables, by far the most convenient way to benefit from parallel operations is by using awaitable_operators
:
asio::awaitable<void> both() {
auto ex = co_await asio::this_coro::executor;
asio::steady_timer t1{ex, 200ms};
asio::steady_timer t2{ex, 100ms};
co_await (t1.async_wait(asio::use_awaitable) && t2.async_wait(asio::use_awaitable));
std::cout << "both completed" << std::endl;
}
See it Live On Coliru.
Note how the executor is intrinsic to the coroutine.
Things become slightly more interesting with output, variadics, non-exception error propagation etc.:
using Timer = asio::use_awaitable_t<>::as_default_on_t<asio::steady_timer>;
using Task = asio::awaitable<void>;
Task all(auto... duration) {
auto ex = co_await asio::this_coro::executor;
auto task = [ex](auto duration) -> Task {
co_await Timer(ex, duration).async_wait();
std::cout << "completed: " << duration << std::endl;
};
co_await (task(duration) && ...);
std::cout << "all completed" << std::endl;
}
int main() {
asio::io_context ioc;
co_spawn(ioc, all(100ms, 200ms, 50ms, 2s), asio::detached);
ioc.run();
}
Which you might prefer without the lambda:
Task task(auto duration) {
co_await Timer(co_await asio::this_coro::executor, duration).async_wait();
std::cout << "completed: " << duration << std::endl;
};
Task all(auto... duration) {
co_await (task(duration) && ...);
std::cout << "all completed" << std::endl;
}
Printing
completed: 50ms
completed: 100ms
completed: 200ms
completed: 2s
all completed
Conversely:
Task any(auto... duration) {
co_await (task(duration) || ...);
std::cout << "first completed" << std::endl;
}
Prints Live On Coliru
completed: 50ms
first completed
And getting some results at the same time:
template <typename Duration> asio::awaitable<Duration> task(Duration dur) {
co_await Timer(co_await asio::this_coro::executor, dur).async_wait();
co_return dur;
};
Task any(auto... dur) {
using M = std::chrono::milliseconds;
std::array<M, sizeof...(dur)> args{dur...};
auto r = co_await (... || task(dur));
std::cout << "Completed: #" << r.index() << "/" << args.size() << ", " << args.at(r.index()) << std::endl;
assert(std::visit<M>(std::identity{}, r) == args.at(r.index()));
}
Printing Live On Coliru
Completed: #2/4, 59ms
¹ I'm not accutely aware of any such changes over the last ~5 release