Search code examples
c++boost-asioppl

async_compose not keeping io_context full of work?


I am trying to write a general async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args) async initiating function.

The goal is to wrap arbitrary, blocking, third-party functions in a thread, and provide an asio-based interface.

It's not perfect yet (for instance, I know I need to post the completion handlers on the executor instead of running them in the thread), but I feel quite close.

I have three issues and questions:

  1. Why does the program stop before all completion handlers have run? I shouldn't need a work guard, since the async operation is ongoing, right? EDIT: I'm mistaken. The non-callback handlers aren't being called at all, as evidenced by putting a sleep_for(1s) after the run() call. So my question is instead, why not?

  2. Is this code violating some asio principle? It seems like something that would be fairly common to want to do, but I find very few examples of people doing similar things.

  3. (bonus) I want to swap std::thread with concurrency::task<void>. The problem is then that I can't use a move-only type in the lambda capture. I tried self = make_shared<remove_reference_t<Self>>(move(self)), but this caused the three handlers to just print str: without the args. I believe this has something to do with the fact that the Self type (really a asio::detail::compose_op) contains a moved-in copy of the impl. So when I go to print, I'm using the old moved-from version. Anyone have any insight why that might be the case?

#include <chrono>
#include <iostream>
#include <memory>
#include <thread>

#include "asio.hpp"

template <typename Fn, typename... Args>
struct async_task_impl {
  std::decay_t<Fn> fn_;
  std::tuple<std::decay_t<Args>...> args_;

  async_task_impl(Fn&& fn, Args&&... args)
      : fn_(std::forward<Fn>(fn)), args_(std::forward<Args>(args)...) {}

  template <typename Self>
  auto operator()(Self& self) {
    // @todo: use concurrency::create_task
    auto t =
        std::thread([me = *this,             // copy impl into thread
                     self = std::move(self)  // move composed_op into thread?
    ]() mutable {
          try {
            std::apply(me.fn_, me.args_);
            self.complete({});
          } catch (std::exception& e) {
            self.complete(std::current_exception());
          }
        });
    t.detach();
  }
};

// runs some blocking task on its own thread and wraps it in asio
template <typename Executor, typename Token, typename Fn, typename... Args>
auto async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args) {
  return asio::async_compose<Token, void(std::exception_ptr)>(
      async_task_impl(std::forward<Fn>(func), std::forward<Args>(args)...),
      token, executor);
}

Test code: Godbolt

void slow_print(std::string str) {
  static std::mutex m;
  std::this_thread::sleep_for(std::chrono::milliseconds(500));
  {
    std::unique_lock lk(m);
    std::cout << "slow_print: " << str << "\n";
  }
  std::this_thread::sleep_for(std::chrono::milliseconds(500));
}

int main() {
  try {
    asio::io_context ctx;

    using namespace std::string_literals;

    async_task(
        ctx, [](std::exception_ptr) { std::cout << "callback done\n"; },
        slow_print, "callback"s);

    asio::co_spawn(
        ctx,
        [&]() -> asio::awaitable<void> {
          co_await async_task(ctx, asio::use_awaitable, slow_print, "coro"s);
        },
        asio::detached);

    auto f = std::async(std::launch::async, [&] {
      auto fut = async_task(ctx, asio::use_future, slow_print, "future"s);
      fut.get();
    });

    ctx.run();
  } catch (std::exception& e) {
    std::cout << e.what() << "\n";
  }
  return 0;
}

Solution

  • SHORT ANSWERS

    1. Why does the program stop before all completion handlers have run?

    I have no direct idea, your own Godbolt link seems to contradict the premise, and so does this slightly embellished example: https://godbolt.org/z/WMKa4sqaE See below for some notes about the changes.

    1. Is this code violating some asio principle?

    Maybe. See below.

    It seems like something that would be fairly common to want to do, but I find very few examples of people doing similar things.

    Yes. The docs have a very similar example: "To see this in practice, let's use a detached thread to adapt a synchronous operation into an asynchronous one"

    1. (bonus) I want to swap std::thread with concurrency::task. The problem is then that I can't use a move-only type in the lambda capture. I tried self = make_shared<remove_reference_t>(move(self)), but this caused the three handlers to just print str: without the args. I believe this has something to do with the fact that the Self type (really a asio::detail::compose_op) contains a moved-in copy of the impl. So when I go to print, I'm using the old moved-from version. Anyone have any insight why that might be the case?

    Beast has some helpers in their code base (stable_operation_base or something, from the top of my head). Also see this blog post by Richard Hodges which creates a shared_composed_op from a composed_op that afford reference stability of the standard operation implementation.

    LONG ANSWERS

    Yes. Resumed coroutines are not work - it's only when they suspend they usually enqueue a handler to resume.

    This is already the case with non-c++20 stackful coros, as Tanner has made very explicitly clear on occasion:

    While spawn() adds work to the io_service (a handler that will start and jump to the coroutine), the coroutine itself is not work. To prevent the io_service event loop from ending while a coroutine is outstanding, it may be necessary to add work to the io_service before yielding.

    What's worse: when you interact with more than one IO object they may be associated with different execution context, so you might need to track work on multiple executors.

    The good news is that Asio (Chris) knew about this, which is why the signature of async_compose takes a list of IoObjectsOrExecutors:

    template<
        typename CompletionToken,
        typename Signature,
        typename Implementation,
        typename... IoObjectsOrExecutors>
    DEDUCED async_compose(
        Implementation && implementation,
        CompletionToken & token,
        IoObjectsOrExecutors &&... io_objects_or_executors);
    
    • io_objects_or_executors Zero or more I/O objects or I/O executors for which outstanding work must be maintained.

    The composed operation specialized on your callable type will effective use boost::asio::prefer(ex, execution::outstanding_work.tracked) on all of the associated executors.

    So as long as the composed operation (self) stays around, there should be work.

    Services Are Not IO Objects Or Executors

    You pass the service ("execution context") itself instead of an executor. When passing executors, prefer to pass by value.

    Then What Went Wrong?

    Again, I don't really know as I didn't exactly reproduce your complaints.

    However, keep in mind the semantics of completion. In simplified pseudo-code, complete() does:

    void complete() {
        work_.reset();
        handler_();
    }
    

    In other words, don't expect the work guards to stick past completion. In fact the order is pretty central to the allocation guarantees of the library.

    (More) Reliable Debug Output

    In C++ use std::flush (or std::endl) if you want output to appear. Otherwise you might just be confused about output timing. This is frequently a source of confusion when printing stuff from completion handlers in Asio.

    For maximum insight, I'll introduce a variadic trace function that also timestamps each trace:

    namespace {
        using std::this_thread::sleep_for;
        static auto now   = std::chrono::steady_clock::now;
        static auto start = now();
        static std::mutex trace_mx;
    
        static void trace(auto const&... args) {
            std::unique_lock lk(trace_mx);
            ((std::cout << "at" << std::setw(5) << (now() - start) / 1ms << "ms ") << ... << args) << std::endl;
        }
    } // namespace
    

    Side Note use_future

    I don't get what you tried to achieve with the std::async version. As it stands you're demonstrating why std::async has been a bad design.

    If you are looking to demonstrate Asio's future support, I'd write:

    auto fut = async_task(ctx, asio::use_future, slow_print, "future"s);
    try {
        fut.get();
        std::cout << "future done" << std::endl;
    } catch (std::exception const& e) {
        std::cout << "future error: " << e.what() << std::endl;
    }
    

    Now, to avoid interfering with the service because the future will block, I'd suggest running the service in the background instead:

    asio::thread_pool ctx{1};
    

    Of course, you can invert the situation by introducing a thread for the blocking wait:

    std::thread ft{[ex] {
        auto fut = async_task(ex, asio::use_future, slow_print, "future");
        try {
            fut.get();
            std::cout << "future done" << std::endl;
        } catch (std::exception const& e) {
            std::cout << "future error: " << e.what() << std::endl;
        }
    }};
    
    ctx.run();
    ft.join();
    

    Double Moves

    In your task implementation, you both move self and copy *this. However compose_op aggregates your async_task_impl (as the impl_ member), so there is a timing link between those. As far as I know the evaluation order of lambda captures is unspecified.

    I'd suggest avoiding the unnecessary copy:

    std::thread([self = std::move(self)]() mutable {
        auto& me = self.impl_;
        try {
            std::apply(me.fn_, me.args_);
            self.complete({});
        } catch (std::exception& e) {
            self.complete(std::current_exception());
        }
    }).detach();
    

    Or indeed, going for syntactic sugar:

    std::thread([self = std::move(self)]() mutable {
        auto& [fn, args] = self.impl_;
        try {
            std::apply(fn, args);
            self.complete({});
        } catch (std::exception& e) {
            self.complete(std::current_exception());
        }
    }).detach();
    

    To make it even more elegant, just pass the self as a mutable argument instead of capturing it (this may not work with concurrency::create_task of course):

    std::thread([](auto self) {
            auto& [fn, args] = self.impl_;
            try {
                std::apply(fn, args);
                self.complete({});
            } catch (std::exception& e) {
                self.complete(std::current_exception());
            }
        }, std::move(self)).detach();
    

    Perfect Storage vs. Perfect Forwarding

    Another place where you are not 100% clear about the forwarding intent is in the async_task_impl constructor. Args... is already in non-deduced context there, so Args&&... mandates rvalues. This might be why you used ""s-literals?

    There are several ways to fix

    1. Either you can let the compiler do its job:

      async_task_impl(Fn&& fn, Args... args)
         : fn_(std::forward<Fn>(fn))
         , args_(std::move(args)...) {}
      
    2. If you feel that's a pessimization (does your code-base use expensive non-move-aware argument types?), the simplest is to make the construct an independent template:

      template <typename Fn2, typename... Args2>
      async_task_impl(Fn2&& fn, Args2&&... args)
          : fn_(std::forward<Fn2>(fn))
          , args_(std::forward<Args2>(args)...) {}
      
    3. I would probably go all-the-way and be explicit about the decay moment using a deduction guide. The best part is you no longer require a constructor at all:

      template <typename Fn, typename... Args> struct async_task_impl {
          Fn                  fn_;
          std::tuple<Args...> args_;
      
          auto operator()(auto& self) const {
              // @todo: use concurrency::create_task
              std::thread(
                  [](auto self) {
                      auto& [fn, args] = self.impl_;
                      try {
                          std::apply(fn, args);
                          self.complete({});
                      } catch (std::exception& e) {
                          self.complete(std::current_exception());
                      }
                  }, std::move(self)).detach();
          }
      };
      
      template <typename... Init> async_task_impl(Init&&...) -> async_task_impl<std::decay_t<Init>...>;
      
      // runs some blocking task on the windows thread pool and wraps it in a nice
      // asio wrapper
      template <typename Executor, typename Token, typename Fn, typename... Args>
      auto async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args) {
          return asio::async_compose<Token, void(std::exception_ptr)>(
              async_task_impl{std::forward<Fn>(func), std::forward<Args>(args)...}, token, executor);
      }
      

    Full Demo

    Combining all the above:

    Live On Coliru Live On Compiler Explorer

    #include "boost/asio.hpp"
    #include <iomanip>
    #include <iostream>
    using namespace std::chrono_literals;
    
    namespace {
        using std::this_thread::sleep_for;
        static auto now   = std::chrono::steady_clock::now;
        static auto start = now();
        static std::mutex trace_mx;
    
        static void trace(auto const&... args) {
            std::unique_lock lk(trace_mx);
            ((std::cout << "at" << std::setw(5) << (now() - start) / 1ms << "ms ") << ... << args) << std::endl;
        }
    } // namespace
    
    template <typename Fn, typename... Args> struct async_task_impl {
        Fn                  fn_;
        std::tuple<Args...> args_;
    
        auto operator()(auto& self) const {
            // @todo: use concurrency::create_task
            std::thread(
                [](auto self) {
                    auto& [fn, args] = self.impl_;
                    try {
                        std::apply(fn, args);
                        self.complete({});
                    } catch (std::exception& e) {
                        self.complete(std::current_exception());
                    }
                },
                std::move(self))
                .detach();
        }
    };
    
    template <typename... Init> async_task_impl(Init&&...) -> async_task_impl<std::decay_t<Init>...>;
    
    // wrap blocking task in an asio wrapper
    namespace asio = boost::asio;
    template <typename Executor, typename Token, typename Fn, typename... Args>
    auto async_task(Executor executor, Token&& token, Fn&& func, Args&&... args) {
        return asio::async_compose<Token, void(std::exception_ptr)>(
            async_task_impl{std::forward<Fn>(func), std::forward<Args>(args)...}, token, executor);
    }
    
    void slow_print(std::string str) {
        sleep_for(500ms);
        trace("slow_print: ", str);
        sleep_for(500ms);
    }
    
    asio::awaitable<void> my_coro() {
        auto ex = co_await asio::this_coro::executor;
        co_await async_task(ex, asio::use_awaitable, slow_print, "coro");
        trace("coro done");
    }
    
    void run_tests(auto ex) {
        async_task(
            ex, [](std::exception_ptr) { trace("callback done"); }, slow_print, "callback");
    
        asio::co_spawn(ex, my_coro(), asio::detached);
    
        std::thread ft{[ex] {
            auto fut = async_task(ex, asio::use_future, slow_print, "future");
            fut.get();
            trace("future done");
        }};
    
        ft.join();
    }
    
    int main() try {
        {
            trace("Starting ctx1");
            asio::io_context ctx1;
    
            run_tests(ctx1.get_executor());
    
            trace("Waiting ctx1");
            ctx1.run();
            trace("Done ctx1");
        }
    
        trace("----\n");
        {
            trace("Starting ctx2");
            asio::thread_pool ctx2{1};
    
            run_tests(ctx2.get_executor());
    
            trace("Waiting ctx2");
            ctx2.join();
            trace("Done ctx2");
        }
    
        sleep_for(2s);
        trace("Bye");
    } catch (std::exception const& e) {
        trace(e.what());
    }
    

    Prints

    at    0ms Starting ctx1
    at  500ms slow_print: callback
    at  500ms slow_print: future
    at 1000ms callback done
    at 1000ms future done
    at 1000ms Waiting ctx1
    at 1501ms slow_print: coro
    at 2001ms coro done
    at 2001ms Done ctx1
    at 2001ms ----
    
    at 2001ms Starting ctx2
    at 2501ms slow_print: callback
    at 2501ms slow_print: future
    at 2502ms slow_print: coro
    at 3001ms callback done
    at 3002ms future done
    at 3002ms coro done
    at 3002ms Waiting ctx2
    at 3002ms Done ctx2
    at 5002ms Bye