I am trying to write a general async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args)
async initiating function.
The goal is to wrap arbitrary, blocking, third-party functions in a thread, and provide an asio-based interface.
It's not perfect yet (for instance, I know I need to post the completion handlers on the executor instead of running them in the thread), but I feel quite close.
I have three issues and questions:
Why does the program stop before all completion handlers have run? I shouldn't need a work guard, since the async operation is ongoing, right? EDIT: I'm mistaken. The non-callback handlers aren't being called at all, as evidenced by putting a sleep_for(1s) after the run() call. So my question is instead, why not?
Is this code violating some asio principle? It seems like something that would be fairly common to want to do, but I find very few examples of people doing similar things.
(bonus) I want to swap std::thread
with concurrency::task<void>
. The problem is then that I can't use a move-only type in the lambda capture. I tried self = make_shared<remove_reference_t<Self>>(move(self))
, but this caused the three handlers to just print str:
without the args. I believe this has something to do with the fact that the Self
type (really a asio::detail::compose_op
) contains a moved-in copy of the impl
. So when I go to print, I'm using the old moved-from version. Anyone have any insight why that might be the case?
#include <chrono>
#include <iostream>
#include <memory>
#include <thread>
#include "asio.hpp"
template <typename Fn, typename... Args>
struct async_task_impl {
std::decay_t<Fn> fn_;
std::tuple<std::decay_t<Args>...> args_;
async_task_impl(Fn&& fn, Args&&... args)
: fn_(std::forward<Fn>(fn)), args_(std::forward<Args>(args)...) {}
template <typename Self>
auto operator()(Self& self) {
// @todo: use concurrency::create_task
auto t =
std::thread([me = *this, // copy impl into thread
self = std::move(self) // move composed_op into thread?
]() mutable {
try {
std::apply(me.fn_, me.args_);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
});
t.detach();
}
};
// runs some blocking task on its own thread and wraps it in asio
template <typename Executor, typename Token, typename Fn, typename... Args>
auto async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args) {
return asio::async_compose<Token, void(std::exception_ptr)>(
async_task_impl(std::forward<Fn>(func), std::forward<Args>(args)...),
token, executor);
}
Test code: Godbolt
void slow_print(std::string str) {
static std::mutex m;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
{
std::unique_lock lk(m);
std::cout << "slow_print: " << str << "\n";
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
int main() {
try {
asio::io_context ctx;
using namespace std::string_literals;
async_task(
ctx, [](std::exception_ptr) { std::cout << "callback done\n"; },
slow_print, "callback"s);
asio::co_spawn(
ctx,
[&]() -> asio::awaitable<void> {
co_await async_task(ctx, asio::use_awaitable, slow_print, "coro"s);
},
asio::detached);
auto f = std::async(std::launch::async, [&] {
auto fut = async_task(ctx, asio::use_future, slow_print, "future"s);
fut.get();
});
ctx.run();
} catch (std::exception& e) {
std::cout << e.what() << "\n";
}
return 0;
}
- Why does the program stop before all completion handlers have run?
I have no direct idea, your own Godbolt link seems to contradict the premise, and so does this slightly embellished example: https://godbolt.org/z/WMKa4sqaE See below for some notes about the changes.
- Is this code violating some asio principle?
Maybe. See below.
It seems like something that would be fairly common to want to do, but I find very few examples of people doing similar things.
Yes. The docs have a very similar example: "To see this in practice, let's use a detached thread to adapt a synchronous operation into an asynchronous one"
Beast has some helpers in their code base (stable_operation_base or something,
from the top of my head). Also see this blog post by Richard
Hodges
which creates a shared_composed_op
from a composed_op
that afford reference
stability of the standard operation implementation.
Yes. Resumed coroutines are not work - it's only when they suspend they usually enqueue a handler to resume.
This is already the case with non-c++20 stackful coros, as Tanner has made very explicitly clear on occasion:
While
spawn()
adds work to theio_service
(a handler that will start and jump to the coroutine), the coroutine itself is not work. To prevent theio_service
event loop from ending while a coroutine is outstanding, it may be necessary to add work to theio_service
before yielding.
What's worse: when you interact with more than one IO object they may be associated with different execution context, so you might need to track work on multiple executors.
The good news is that Asio (Chris) knew about this, which is why the signature of async_compose
takes a list of IoObjectsOrExecutors
:
template< typename CompletionToken, typename Signature, typename Implementation, typename... IoObjectsOrExecutors> DEDUCED async_compose( Implementation && implementation, CompletionToken & token, IoObjectsOrExecutors &&... io_objects_or_executors);
io_objects_or_executors
Zero or more I/O objects or I/O executors for which outstanding work must be maintained.
The composed operation specialized on your callable type will effective use boost::asio::prefer(ex, execution::outstanding_work.tracked)
on all of the associated executors.
So as long as the composed operation (self
) stays around, there should be work.
You pass the service ("execution context") itself instead of an executor. When passing executors, prefer to pass by value.
Again, I don't really know as I didn't exactly reproduce your complaints.
However, keep in mind the semantics of completion. In simplified pseudo-code, complete()
does:
void complete() {
work_.reset();
handler_();
}
In other words, don't expect the work guards to stick past completion. In fact the order is pretty central to the allocation guarantees of the library.
In C++ use std::flush
(or std::endl
) if you want output to appear.
Otherwise you might just be confused about output timing. This is frequently a
source of confusion when printing stuff from completion handlers in Asio.
For maximum insight, I'll introduce a variadic trace function that also timestamps each trace:
namespace {
using std::this_thread::sleep_for;
static auto now = std::chrono::steady_clock::now;
static auto start = now();
static std::mutex trace_mx;
static void trace(auto const&... args) {
std::unique_lock lk(trace_mx);
((std::cout << "at" << std::setw(5) << (now() - start) / 1ms << "ms ") << ... << args) << std::endl;
}
} // namespace
use_future
I don't get what you tried to achieve with the std::async
version. As it stands you're demonstrating why std::async
has been a bad design.
If you are looking to demonstrate Asio's future support, I'd write:
auto fut = async_task(ctx, asio::use_future, slow_print, "future"s);
try {
fut.get();
std::cout << "future done" << std::endl;
} catch (std::exception const& e) {
std::cout << "future error: " << e.what() << std::endl;
}
Now, to avoid interfering with the service because the future will block, I'd suggest running the service in the background instead:
asio::thread_pool ctx{1};
Of course, you can invert the situation by introducing a thread for the blocking wait:
std::thread ft{[ex] {
auto fut = async_task(ex, asio::use_future, slow_print, "future");
try {
fut.get();
std::cout << "future done" << std::endl;
} catch (std::exception const& e) {
std::cout << "future error: " << e.what() << std::endl;
}
}};
ctx.run();
ft.join();
In your task implementation, you both move self
and copy *this
. However
compose_op
aggregates your async_task_impl
(as the impl_
member), so
there is a timing link between those. As far as I know the evaluation order of
lambda captures is unspecified.
I'd suggest avoiding the unnecessary copy:
std::thread([self = std::move(self)]() mutable {
auto& me = self.impl_;
try {
std::apply(me.fn_, me.args_);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
}).detach();
Or indeed, going for syntactic sugar:
std::thread([self = std::move(self)]() mutable {
auto& [fn, args] = self.impl_;
try {
std::apply(fn, args);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
}).detach();
To make it even more elegant, just pass the self
as a mutable argument
instead of capturing it (this may not work with concurrency::create_task
of
course):
std::thread([](auto self) {
auto& [fn, args] = self.impl_;
try {
std::apply(fn, args);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
}, std::move(self)).detach();
Another place where you are not 100% clear about the forwarding intent is in the async_task_impl
constructor. Args...
is already in non-deduced context there, so Args&&...
mandates rvalues. This might be why you used ""s
-literals?
There are several ways to fix
Either you can let the compiler do its job:
async_task_impl(Fn&& fn, Args... args)
: fn_(std::forward<Fn>(fn))
, args_(std::move(args)...) {}
If you feel that's a pessimization (does your code-base use expensive non-move-aware argument types?), the simplest is to make the construct an independent template:
template <typename Fn2, typename... Args2>
async_task_impl(Fn2&& fn, Args2&&... args)
: fn_(std::forward<Fn2>(fn))
, args_(std::forward<Args2>(args)...) {}
I would probably go all-the-way and be explicit about the decay moment using a deduction guide. The best part is you no longer require a constructor at all:
template <typename Fn, typename... Args> struct async_task_impl {
Fn fn_;
std::tuple<Args...> args_;
auto operator()(auto& self) const {
// @todo: use concurrency::create_task
std::thread(
[](auto self) {
auto& [fn, args] = self.impl_;
try {
std::apply(fn, args);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
}, std::move(self)).detach();
}
};
template <typename... Init> async_task_impl(Init&&...) -> async_task_impl<std::decay_t<Init>...>;
// runs some blocking task on the windows thread pool and wraps it in a nice
// asio wrapper
template <typename Executor, typename Token, typename Fn, typename... Args>
auto async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args) {
return asio::async_compose<Token, void(std::exception_ptr)>(
async_task_impl{std::forward<Fn>(func), std::forward<Args>(args)...}, token, executor);
}
Combining all the above:
Live On Coliru Live On Compiler Explorer
#include "boost/asio.hpp"
#include <iomanip>
#include <iostream>
using namespace std::chrono_literals;
namespace {
using std::this_thread::sleep_for;
static auto now = std::chrono::steady_clock::now;
static auto start = now();
static std::mutex trace_mx;
static void trace(auto const&... args) {
std::unique_lock lk(trace_mx);
((std::cout << "at" << std::setw(5) << (now() - start) / 1ms << "ms ") << ... << args) << std::endl;
}
} // namespace
template <typename Fn, typename... Args> struct async_task_impl {
Fn fn_;
std::tuple<Args...> args_;
auto operator()(auto& self) const {
// @todo: use concurrency::create_task
std::thread(
[](auto self) {
auto& [fn, args] = self.impl_;
try {
std::apply(fn, args);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
},
std::move(self))
.detach();
}
};
template <typename... Init> async_task_impl(Init&&...) -> async_task_impl<std::decay_t<Init>...>;
// wrap blocking task in an asio wrapper
namespace asio = boost::asio;
template <typename Executor, typename Token, typename Fn, typename... Args>
auto async_task(Executor executor, Token&& token, Fn&& func, Args&&... args) {
return asio::async_compose<Token, void(std::exception_ptr)>(
async_task_impl{std::forward<Fn>(func), std::forward<Args>(args)...}, token, executor);
}
void slow_print(std::string str) {
sleep_for(500ms);
trace("slow_print: ", str);
sleep_for(500ms);
}
asio::awaitable<void> my_coro() {
auto ex = co_await asio::this_coro::executor;
co_await async_task(ex, asio::use_awaitable, slow_print, "coro");
trace("coro done");
}
void run_tests(auto ex) {
async_task(
ex, [](std::exception_ptr) { trace("callback done"); }, slow_print, "callback");
asio::co_spawn(ex, my_coro(), asio::detached);
std::thread ft{[ex] {
auto fut = async_task(ex, asio::use_future, slow_print, "future");
fut.get();
trace("future done");
}};
ft.join();
}
int main() try {
{
trace("Starting ctx1");
asio::io_context ctx1;
run_tests(ctx1.get_executor());
trace("Waiting ctx1");
ctx1.run();
trace("Done ctx1");
}
trace("----\n");
{
trace("Starting ctx2");
asio::thread_pool ctx2{1};
run_tests(ctx2.get_executor());
trace("Waiting ctx2");
ctx2.join();
trace("Done ctx2");
}
sleep_for(2s);
trace("Bye");
} catch (std::exception const& e) {
trace(e.what());
}
Prints
at 0ms Starting ctx1
at 500ms slow_print: callback
at 500ms slow_print: future
at 1000ms callback done
at 1000ms future done
at 1000ms Waiting ctx1
at 1501ms slow_print: coro
at 2001ms coro done
at 2001ms Done ctx1
at 2001ms ----
at 2001ms Starting ctx2
at 2501ms slow_print: callback
at 2501ms slow_print: future
at 2502ms slow_print: coro
at 3001ms callback done
at 3002ms future done
at 3002ms coro done
at 3002ms Waiting ctx2
at 3002ms Done ctx2
at 5002ms Bye