C++-20 with coroutines (co_await), with boost::asio. I'm implementing a producer/consumer relationship: the consumer runs in a co-routine, the producer fires from any other thread. Using std::queue
for transport.
If it were reversed with coroutine as producer, it's straightforward: my coroutine would do a unique_lock
, push into the queue, release the lock, then call the data_ready_condition.notify_one()
. Threads would use data_ready_condition.wait()
with the mutex, pop()
from the queue with the mutex locked on the way out of wait()
, unlock the mutex, and be on our way. Classic thread synchronization.
Back to the original problem: the threaded producer does the classic lock/push/unlock, but now the thread would do a data_ready_condition.notify_one()
... but there seems to be no such async version of std::condition_variable
for the coroutine side, that would do this:
co_await data_ready_condition.async_wait(); // No such thing?
I can't find it, or anything truly equivalent, in the asio docs. I thought I was being clever using asio::steady_timer
and using cancel()
in place of notify_one()
. But now I don't think this will cut it without the mutex really coordinated with it.
So is there a way to do a true async_wait()
with either std::condition_variable
or something that asio provides? (It seems like io_context
would supply this bridge somehow, since that's related to its job, but again I can't find it.)
Any other std::
or 3rd-party library I should know about? The compiler itself provides co_await
, right? But I don't see anything outside of asio designed to harness it.
What am I missing?
I'm using boost::asio 1.83, gcc version 13.2.0 (Rev2, Built by MSYS2 project), -std=c++20 -fcoroutines
Using std::queue for transport.
std::queue
is not a transport. It's a medium.
I can't find it, or anything truly equivalent, in the asio docs
I think I can, see below
I thought I was being clever using asio::steady_timer and using cancel() in place of notify_one().
Yes, this has been established pattern for about a (half) decade of Asio.
Note that there is is even cancel_one
these days, so you can more usefully model condition variables.
[...] But now I don't think this will cut it without the mutex really coordinated with it.
Somewhat true. You can get around the need for explicit synchronization (locking) by using strands ("Strands: Use Threads Without Explicit Locking"). However, if you really want to use non-asio threads you have no other option than to use tradional thread synchronization.
Without further ado, let me suggest Channels.
Note: "experimental" means the interface might still change, but the concept is here to stay
In your case you could use a concurrent_channel
and consume it from many coroutines.
With a blocking (synchronous) producer thread:
#include <atomic>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <syncstream>
using namespace std::chrono_literals;
static inline auto out() {
static constexpr auto now = std::chrono::steady_clock::now;
static auto const start = now();
static std::atomic_int tid_gen = 0;
static thread_local int const tid = tid_gen++;
return std::osyncstream(std::clog) //
<< std::unitbuf << "th:" << std::setw(2) << tid //
<< std::setw(8) << (now() - start) / 1ms << "ms " //
;
}
#include <boost/asio.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
namespace asio = boost::asio;
using boost::system::error_code;
using Message = std::string;
using Channel = asio::experimental::concurrent_channel<void(error_code, Message)>;
static auto const err_shutdown = asio::error::eof; // TODO use own error_code
asio::awaitable<void> delay(auto duration) {
co_await asio::steady_timer(co_await asio::this_coro::executor, duration) //
.async_wait(asio::deferred);
}
asio::awaitable<void> consumer(int id, Channel& chan) {
out() << "Consumer " << id << " start" << std::endl;
error_code ec;
auto token = redirect_error(asio::deferred, ec);
for (; !ec.failed();) {
Message msg = co_await chan.async_receive(token);
if (!ec) {
out() << "Consumer " << id << " handling " << quoted(msg) << std::endl;
co_await delay(10ms); // simulate work
}
}
if (ec == err_shutdown)
chan.close(); // or cancel(), whatever makes sense for your application
out() << "Consumer " << id << " leave (" << ec.message() << ")" << std::endl;
co_return;
}
void producer(Channel& chan) {
auto send = [&chan](unsigned n, auto text, error_code ec = {}) {
while (n -= chan.try_send_n(n, ec, text)) {
out() << "Producer waiting for queue space\n";
std::this_thread::sleep_for(1ms);
}
};
out() << "Producer start" << std::endl;
for (auto text : {"foo", "bar", "qux"})
send(3, text);
out() << "Producer signal system shutdown" << std::endl;
send(1, "", asio::error::eof);
out() << "Producer leave" << std::endl;
}
int main() {
out() << "Start" << std::endl;
asio::thread_pool ioc;
Channel chan(ioc, 10);
for (auto i = 0; i < 10; ++i)
co_spawn(ioc, consumer(i, chan), asio::detached);
producer(chan);
ioc.join();
}
Typical outputs:
Note that synchronous producers may result in sub-optimal throughput (Boost Asio experimental channel poor performance). Also, I think async producers are much more elegant:
#include <atomic>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <syncstream>
using namespace std::chrono_literals;
static inline auto out() {
static constexpr auto now = std::chrono::steady_clock::now;
static auto const start = now();
static std::atomic_int tid_gen = 0;
static thread_local int const tid = tid_gen++;
return std::osyncstream(std::clog) //
<< std::unitbuf << "th:" << std::setw(2) << tid //
<< std::setw(8) << (now() - start) / 1ms << "ms " //
;
}
#include <boost/asio.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
namespace asio = boost::asio;
using boost::system::error_code;
using Message = std::string;
using Channel = asio::experimental::concurrent_channel<void(error_code, Message)>;
static auto const err_shutdown = asio::error::eof; // TODO use own error_code
asio::awaitable<void> delay(auto duration) {
co_await asio::steady_timer(co_await asio::this_coro::executor, duration) //
.async_wait(asio::deferred);
}
asio::awaitable<void> consumer(int id, Channel& chan) {
out() << "Consumer " << id << " start" << std::endl;
error_code ec;
auto token = redirect_error(asio::deferred, ec);
for (; !ec.failed();) {
Message msg = co_await chan.async_receive(token);
if (!ec) {
out() << "Consumer " << id << " handling " << quoted(msg) << std::endl;
co_await delay(10ms); // simulate work
}
}
if (ec == err_shutdown)
chan.close(); // or cancel(), whatever makes sense for your application
out() << "Consumer " << id << " leave (" << ec.message() << ")" << std::endl;
co_return;
}
asio::awaitable<void> producer(Channel& chan) {
out() << "Producer start" << std::endl;
for (auto text : {"foo", "bar", "qux"})
for (auto n : {1, 2, 3})
co_await chan.async_send({}, text + std::to_string(n), asio::deferred);
out() << "Producer signal system shutdown" << std::endl;
co_await chan.async_send(asio::error::eof,"", asio::deferred);
out() << "Producer leave" << std::endl;
}
int main() {
out() << "Start" << std::endl;
asio::thread_pool ioc;
Channel chan(ioc, 10);
for (auto i = 0; i < 10; ++i)
co_spawn(ioc, consumer(i, chan), asio::detached);
co_spawn(ioc, producer(chan), asio::detached);
ioc.join();
}
With very similar outputs:
th: 0 0ms Start
th: 1 0ms Consumer 0 start
th: 2 0ms Consumer 1 start
th: 4 0ms Consumer 2 start
th: 3 0ms Consumer 3 start
th: 3 0ms Consumer 8 start
th: 1 0ms Consumer 7 start
th: 5 0ms Consumer 4 start
th: 2 0ms Producer start
th: 6 0ms Consumer 5 start
th: 2 0ms Consumer 1 handling "foo2"
th: 6 0ms Consumer 2 handling "foo3"
th: 3 0ms Consumer 8 handling "bar2"
th: 7 0ms Consumer 6 start
th: 4 1ms Consumer 7 handling "bar3"
th: 4 1ms Consumer 4 handling "qux1"
th: 7 1ms Consumer 5 handling "qux2"
th: 4 1ms Producer signal system shutdown
th: 5 0ms Consumer 3 handling "bar1"
th: 8 0ms Consumer 9 start
th: 1 0ms Consumer 0 handling "foo1"
th: 4 1ms Producer leave
th: 3 1ms Consumer 9 leave (End of file)
th: 2 1ms Consumer 6 handling "qux3"
th: 8 11ms Consumer 7 leave (Channel closed)
th: 6 11ms Consumer 2 leave (Channel closed)
th: 5 11ms Consumer 4 leave (Channel closed)
th: 7 11ms Consumer 5 leave (Channel closed)
th: 3 11ms Consumer 3 leave (Channel closed)
th: 1 11ms Consumer 0 leave (Channel closed)
th: 8 11ms Consumer 8 leave (Channel closed)
th: 4 11ms Consumer 6 leave (Channel closed)
th: 2 11ms Consumer 1 leave (Channel closed)