Search code examples

C++ / asio / coroutine consumer, threaded producer: is a correct solution possible with existing mechanisms?

C++-20 with coroutines (co_await), with boost::asio. I'm implementing a producer/consumer relationship: the consumer runs in a co-routine, the producer fires from any other thread. Using std::queue for transport.

If it were reversed with coroutine as producer, it's straightforward: my coroutine would do a unique_lock, push into the queue, release the lock, then call the data_ready_condition.notify_one(). Threads would use data_ready_condition.wait() with the mutex, pop() from the queue with the mutex locked on the way out of wait(), unlock the mutex, and be on our way. Classic thread synchronization.

Back to the original problem: the threaded producer does the classic lock/push/unlock, but now the thread would do a data_ready_condition.notify_one() ... but there seems to be no such async version of std::condition_variable for the coroutine side, that would do this:

co_await data_ready_condition.async_wait(); // No such thing?

I can't find it, or anything truly equivalent, in the asio docs. I thought I was being clever using asio::steady_timer and using cancel() in place of notify_one(). But now I don't think this will cut it without the mutex really coordinated with it.

So is there a way to do a true async_wait() with either std::condition_variable or something that asio provides? (It seems like io_context would supply this bridge somehow, since that's related to its job, but again I can't find it.)

Any other std:: or 3rd-party library I should know about? The compiler itself provides co_await, right? But I don't see anything outside of asio designed to harness it.

What am I missing?

I'm using boost::asio 1.83, gcc version 13.2.0 (Rev2, Built by MSYS2 project), -std=c++20 -fcoroutines


  • Using std::queue for transport.

    std::queue is not a transport. It's a medium.

    I can't find it, or anything truly equivalent, in the asio docs

    I think I can, see below

    I thought I was being clever using asio::steady_timer and using cancel() in place of notify_one().

    Yes, this has been established pattern for about a (half) decade of Asio. Note that there is is even cancel_one these days, so you can more usefully model condition variables.

    [...] But now I don't think this will cut it without the mutex really coordinated with it.

    Somewhat true. You can get around the need for explicit synchronization (locking) by using strands ("Strands: Use Threads Without Explicit Locking"). However, if you really want to use non-asio threads you have no other option than to use tradional thread synchronization.


    Without further ado, let me suggest Channels.

    Note: "experimental" means the interface might still change, but the concept is here to stay

    In your case you could use a concurrent_channel and consume it from many coroutines.


    With a blocking (synchronous) producer thread:

    Live On Coliru

    #include <atomic>
    #include <chrono>
    #include <iomanip>
    #include <iostream>
    #include <syncstream>
    using namespace std::chrono_literals;
    static inline auto out() {
        static constexpr auto now   = std::chrono::steady_clock::now;
        static auto const     start = now();
        static std::atomic_int        tid_gen = 0;
        static thread_local int const tid     = tid_gen++;
        return std::osyncstream(std::clog)                    //
            << std::unitbuf << "th:" << std::setw(2) << tid   //
            << std::setw(8) << (now() - start) / 1ms << "ms " //
    #include <boost/asio.hpp>
    #include <boost/asio/experimental/concurrent_channel.hpp>
    namespace asio = boost::asio;
    using boost::system::error_code;
    using Message = std::string;
    using Channel = asio::experimental::concurrent_channel<void(error_code, Message)>;
    static auto const err_shutdown = asio::error::eof; // TODO use own error_code
    asio::awaitable<void> delay(auto duration) {
        co_await asio::steady_timer(co_await asio::this_coro::executor, duration) //
    asio::awaitable<void> consumer(int id, Channel& chan) {
        out() << "Consumer " << id << " start" << std::endl;
        error_code ec;
        auto       token = redirect_error(asio::deferred, ec);
        for (; !ec.failed();) {
            Message msg = co_await chan.async_receive(token);
            if (!ec) {
                out() << "Consumer " << id << " handling " << quoted(msg) << std::endl;
                co_await delay(10ms); // simulate work
        if (ec == err_shutdown)
            chan.close(); // or cancel(), whatever makes sense for your application
        out() << "Consumer " << id << " leave (" << ec.message() << ")" << std::endl;
    void producer(Channel& chan) {
        auto send = [&chan](unsigned n, auto text, error_code ec = {}) {
            while (n -= chan.try_send_n(n, ec, text)) {
                out() << "Producer waiting for queue space\n";
        out() << "Producer start" << std::endl;
        for (auto text : {"foo", "bar", "qux"})
            send(3, text);
        out() << "Producer signal system shutdown" << std::endl;
        send(1, "", asio::error::eof);
        out() << "Producer leave" << std::endl;
    int main() {
        out() << "Start" << std::endl;
        asio::thread_pool ioc;
        Channel           chan(ioc, 10);
        for (auto i = 0; i < 10; ++i)
            co_spawn(ioc, consumer(i, chan), asio::detached);

    Typical outputs:

    enter image description here

    BONUS: Async Producer

    Note that synchronous producers may result in sub-optimal throughput (Boost Asio experimental channel poor performance). Also, I think async producers are much more elegant:

    Live On Coliru

    #include <atomic>
    #include <chrono>
    #include <iomanip>
    #include <iostream>
    #include <syncstream>
    using namespace std::chrono_literals;
    static inline auto out() {
        static constexpr auto now   = std::chrono::steady_clock::now;
        static auto const     start = now();
        static std::atomic_int        tid_gen = 0;
        static thread_local int const tid     = tid_gen++;
        return std::osyncstream(std::clog)                    //
            << std::unitbuf << "th:" << std::setw(2) << tid   //
            << std::setw(8) << (now() - start) / 1ms << "ms " //
    #include <boost/asio.hpp>
    #include <boost/asio/experimental/concurrent_channel.hpp>
    namespace asio = boost::asio;
    using boost::system::error_code;
    using Message = std::string;
    using Channel = asio::experimental::concurrent_channel<void(error_code, Message)>;
    static auto const err_shutdown = asio::error::eof; // TODO use own error_code
    asio::awaitable<void> delay(auto duration) {
        co_await asio::steady_timer(co_await asio::this_coro::executor, duration) //
    asio::awaitable<void> consumer(int id, Channel& chan) {
        out() << "Consumer " << id << " start" << std::endl;
        error_code ec;
        auto       token = redirect_error(asio::deferred, ec);
        for (; !ec.failed();) {
            Message msg = co_await chan.async_receive(token);
            if (!ec) {
                out() << "Consumer " << id << " handling " << quoted(msg) << std::endl;
                co_await delay(10ms); // simulate work
        if (ec == err_shutdown)
            chan.close(); // or cancel(), whatever makes sense for your application
        out() << "Consumer " << id << " leave (" << ec.message() << ")" << std::endl;
    asio::awaitable<void> producer(Channel& chan) {
        out() << "Producer start" << std::endl;
        for (auto text : {"foo", "bar", "qux"})
            for (auto n : {1, 2, 3})
                co_await chan.async_send({}, text + std::to_string(n), asio::deferred);
        out() << "Producer signal system shutdown" << std::endl;
        co_await chan.async_send(asio::error::eof,"", asio::deferred);
        out() << "Producer leave" << std::endl;
    int main() {
        out() << "Start" << std::endl;
        asio::thread_pool ioc;
        Channel           chan(ioc, 10);
        for (auto i = 0; i < 10; ++i)
            co_spawn(ioc, consumer(i, chan), asio::detached);
        co_spawn(ioc, producer(chan), asio::detached);

    With very similar outputs:

    th: 0       0ms Start
    th: 1       0ms Consumer 0 start
    th: 2       0ms Consumer 1 start
    th: 4       0ms Consumer 2 start
    th: 3       0ms Consumer 3 start
    th: 3       0ms Consumer 8 start
    th: 1       0ms Consumer 7 start
    th: 5       0ms Consumer 4 start
    th: 2       0ms Producer start
    th: 6       0ms Consumer 5 start
    th: 2       0ms Consumer 1 handling "foo2"
    th: 6       0ms Consumer 2 handling "foo3"
    th: 3       0ms Consumer 8 handling "bar2"
    th: 7       0ms Consumer 6 start
    th: 4       1ms Consumer 7 handling "bar3"
    th: 4       1ms Consumer 4 handling "qux1"
    th: 7       1ms Consumer 5 handling "qux2"
    th: 4       1ms Producer signal system shutdown
    th: 5       0ms Consumer 3 handling "bar1"
    th: 8       0ms Consumer 9 start
    th: 1       0ms Consumer 0 handling "foo1"
    th: 4       1ms Producer leave
    th: 3       1ms Consumer 9 leave (End of file)
    th: 2       1ms Consumer 6 handling "qux3"
    th: 8      11ms Consumer 7 leave (Channel closed)
    th: 6      11ms Consumer 2 leave (Channel closed)
    th: 5      11ms Consumer 4 leave (Channel closed)
    th: 7      11ms Consumer 5 leave (Channel closed)
    th: 3      11ms Consumer 3 leave (Channel closed)
    th: 1      11ms Consumer 0 leave (Channel closed)
    th: 8      11ms Consumer 8 leave (Channel closed)
    th: 4      11ms Consumer 6 leave (Channel closed)
    th: 2      11ms Consumer 1 leave (Channel closed)