Search code examples
c++boostc++20boost-asio

Boost.ASIO how to use strands with c++20 coroutines


Consider the following code:

#include <boost/asio.hpp>
#include <chrono>
#include <iostream>

namespace io = boost::asio;

class test {
public:
    test(io::any_io_executor e) : exe{ std::move(e) } {}

    io::awaitable<void> delay(size_t sec) {
        io::steady_timer t{ exe };
        t.expires_after(std::chrono::seconds{ sec });
        co_await t.async_wait(io::use_awaitable);
    }

    io::awaitable<void> print_and_delay(int num) {
        std::cout << num << '\n';
        co_await delay(1);
    }

    io::awaitable<void> d_print() {
        co_await print_and_delay(1);
        co_await print_and_delay(2);
        co_await print_and_delay(3);
    }

    void start() {
        for (size_t i = 0; i != 3; ++i) {
            io::co_spawn(exe, d_print(), io::detached);
        }
    }

protected:
    io::any_io_executor exe;
};

int main() {
    io::io_context ctx;
    test t{ ctx.get_executor() };
    t.start();
    ctx.run();
    return 0;
}

Output:

1
1
1
(delay 1 second)
2
2
2
(delay 1 second)
3
3
3
(delay 1 second)

Now I want to make the execution of d_print serial: only one d_print coroutine runs at once.

Expected output:

1
2
3
1
2
3
1
2
3

With each line delayed for 1 second.

Can I achieve this through the usage of strands?

What I want to do is adding something like synchorized(st) { around the three print_and_delays. When the executor tries to run the following coroutines,block them until the running coroutine finishes executing.


Solution

  • Strands already protect the code posted through it against concurrent execution.

    The behaviour is correct, and adding the putative "synchronized" keyword would not change it either.

    What you are after is not protection against concurrent/overlapping execution - because it is already working. Instead you want to avoid posting the new coroutine (d_print) instance before the previous stopped.

    Solution

    The classical solution to this is queuing.

    Similar requirements come up with e.g. full-duplex IO where outgoing messages have to be queued to avoid interleaving writes.

    Strands are still useful to avoid unsynchronized access to the queue:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <chrono>
    #include <deque>
    #include <iostream>
    
    namespace asio = boost::asio;
    
    class test {
      public:
        test(asio::any_io_executor e) : strand_{make_strand(std::move(e))} {}
    
        static asio::awaitable<void> delay(size_t sec) {
            co_await asio::steady_timer{co_await asio::this_coro::executor, std::chrono::seconds{sec}} //
                .async_wait(asio::use_awaitable);
        }
    
        asio::awaitable<void> print_and_delay(int num) {
            std::cout << num << '\n';
            co_await delay(1);
        }
    
        asio::awaitable<void> d_print() {
            co_await print_and_delay(1);
            co_await print_and_delay(2);
            co_await print_and_delay(3);
        }
    
        void start() {
            post(strand_, std::bind(&test::do_start, this));
        }
    
      private:
        asio::strand<asio::any_io_executor> strand_;
    
        std::deque<asio::awaitable<void> > queue_;
    
        asio::awaitable<void> drain_queue() { // runs on strand
            while (!queue_.empty()) {
                co_await std::move(queue_.front());
                queue_.pop_front();
            }
        }
    
        void do_start() { // runs on strand
            bool not_running = queue_.empty();
    
            for (size_t i = 0; i != 3; ++i)
                queue_.push_back(d_print());
    
            if (not_running)
                co_spawn(strand_, std::bind(&test::drain_queue, this), asio::detached);
        }
    };
    
    int main() {
        asio::io_context ctx;
    
        test t{ctx.get_executor()};
        t.start();
    
        ctx.run();
    }
    

    enter image description here

    Other Thoughts

    You can also use the experimental Asio Channels feature, which could simplify thread-safety and give more flexibility.