Search code examples
c++boostasioc++-coroutine

Save boost asio async operation completion handler


I need to implement custom asynchronous operation using boost asio library. The operation itself will be performed by a 3rd-party library.

Approach, which I'm following is:

  1. start async operation via boost::asio::async_initiate. For this I'm implementing initiating function
  2. Store async completion handler, which is passed by boost::asio::async_initiate to the initiating function
  3. Start 3rd-party async operation
  4. Exit initiating function.
    According to the boost reference initiating function should be non-blocking.
  5. When async operation completes, invoke previously stored async completion handler

The code, replicating this is below. 3rd-party library here is simulated with a separate thread and boolean flag:

#include <iostream>
#include <string>
#include <thread>
#include <functional>
#include <atomic>
#include <boost/asio.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/co_spawn.hpp>

std::atomic<bool> asyncOpStarted = false;

using signature = void(std::string);
std::function<signature> completionHandler;

// Dummy implementation to simulate the async operation
void start_operation()
{
    asyncOpStarted = true;
    std::cout << "Async operation starting..." << std::endl;
}

void do_async_loop()
{
    while (!asyncOpStarted) {
        // sleep & check if operation started
        std::cout << "Waiting for async start..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    };

    std::cout << "Async operation started..." << std::endl;
    // simualate delay
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Async operation finished. Invoke completion handler..." << std::endl;
    completionHandler("Done");
}

// Async operation implementation
template <typename CompletionToken>
auto async_do_operation(boost::asio::io_context& io, CompletionToken&& token)
{  
    return boost::asio::async_initiate<CompletionToken, signature>(
        [&](auto&& handler) {
            // I need to store "handler" here

            // This doesn't work. std::function<...> requires handler to be copyable:
            //completionHandler = [h = std::move(handler)](std::string res) mutable {
            //    h(res);
            //};

            start_operation();
        }, 
        std::move(token)
    );
}

int main()
{
    std::thread t(do_async_loop);

    boost::asio::io_context io;
    
    boost::asio::co_spawn(io, [&]() -> boost::asio::awaitable<void> {
        try {
            // Use `boost::asio::use_awaitable` as the completion token
            auto result = co_await async_do_operation(io, boost::asio::use_awaitable);
            std::cout << "Coroutine: " << result << std::endl;
        } catch (...) {
            std::cerr << "Exception caught" << std::endl;
        }
    }, boost::asio::detached);

    // Run the io_context to process async events
    io.run();
  
}

I'm failing at step (2). Cannot store completion handler in any way. I tried the following:

  1. Use std::function and move completion handler into capture list
    That didn't work because std::function requires handler to be copyable
  2. Create shared pointer to the completion handler, which then could be captured to lambda.
  3. Store completion handler into a variable directly.

So, my questions are:

  1. Am I implementing the integration in a right way at all?
  2. If yes, what is the designed approach in boost asio to store completion handler while operation is running?

Thanks in advance.


Solution

  • You can use the relatively new asio::any_completion_handler type-erasing container.

    Here's a simplified version that also fixes some things:

    • std::forward instead of std::move
    • missing std::move on the synthesized handler (in async_initiate)
    • missing thread join
    • not juggling references to execution context, instead decoupling that by passing executors (or even detecting the current coroutine's executor).

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    #include <string>
    #include <thread>
    namespace asio = boost::asio;
    
    std::atomic<bool> asyncOpStarted = false;
    
    using signature = void(std::string);
    asio::any_completion_handler<signature> completionHandler;
    
    // Dummy implementation to simulate the async operation
    void start_operation() {
        asyncOpStarted = true;
        std::cout << "Async operation starting..." << std::endl;
    }
    
    void do_async_loop() {
        while (!asyncOpStarted) {
            // sleep & check if operation started
            std::cout << "Waiting for async start..." << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1));
        };
    
        std::cout << "Async operation started..." << std::endl;
        // simualate delay
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "Async operation finished. Invoke completion handler..." << std::endl;
        completionHandler("Done");
    }
    
    template <typename Token> //
    auto async_do_operation(asio::any_io_executor ex, Token&& token) {
        return asio::async_initiate<Token, signature>(
            [&](auto&& handler) {
                completionHandler =                                         //
                    [w = make_work_guard(ex), handler = std::move(handler)] //
                    (std::string res) mutable                               //
                {                                                           //
                    std::move(handler)(std::move(res));
                };
    
                start_operation();
            },
            std::forward<Token>(token));
    }
    
    asio::awaitable<void> coro() try {
        auto ex = co_await asio::this_coro::executor;
        std::cout << "Coroutine: " << co_await async_do_operation(ex, asio::deferred) << std::endl;
    } catch (...) {
        std::cerr << "Exception caught" << std::endl;
    }
    
    int main() {
        std::thread t(do_async_loop);
    
        asio::io_context io;
    
        co_spawn(io, coro, asio::detached);
    
        io.run();
    
        t.join();
    }
    

    Output:

    Waiting for async start...
    Async operation starting...
    Async operation started...
    Async operation finished. Invoke completion handler...
    Coroutine: Done