Search code examples
c++c++20c++-coroutineio-uring

liburing: io_uring_submit() causes error when placed in await_suspend


I'm currently trying out C++ coroutines abstracting away io_uring. For that I have the following class:

class io_service final {
public:
   explicit io_service(unsigned size, threadpool& pool) : pool_(pool) {
      if (auto ret = io_uring_queue_init(size, &ring_, 0); ret < 0) {
         throw std::runtime_error{"Liburing error!"};
      }
   }

   ~io_service() {
      io_uring_queue_exit(&ring_);
   }

   void message_pump() {
      io_uring_cqe* cqe = nullptr;

      while (true) {
         auto ret = io_uring_wait_cqe(&ring_, &cqe);
         auto* data = static_cast<io_result*>(io_uring_cqe_get_data(cqe));


         if (ret < 0) {
            std::cerr << "Fatal error in io_uring_wait_cqe!\n";
            throw std::runtime_error{"Fatal error in io_uring_wait_cqe!"};
         }

         if (cqe->res < 0) {
            std::cerr << "Error while doing an asynchronous request: " 
                      << -cqe->res << " (" << strerror(-cqe->res) << ")\n";
            throw std::runtime_error{"Error while doing an asynchronous request : " 
                        + std::string(strerror(-cqe->res))};
         }

         data->status_code = cqe->res;

         pool_.push_task([handle = data->handle] { handle.resume(); });
         io_uring_cqe_seen(&ring_, cqe);
      }
   }

   [[nodiscard]] auto accept_async(int socket, sockaddr_in& in, socklen_t& socket_length) {
      return uring_awaitable{
         &ring_, 
         io_result::operation_type::accept, 
         io_uring_prep_accept,
         socket, 
         reinterpret_cast<sockaddr*>(&in), 
         &socket_length, 
         0
      };
   }

  
private:
   struct uring_awaiter {
      io_uring* ring_;
      io_uring_sqe* entry;
      io_result request_data{};

      explicit uring_awaiter(io_result::operation_type op_type, io_uring* ring, io_uring_sqe* sqe) : ring_(ring), entry(sqe), request_data{op_type} {}

      [[nodiscard]] bool await_ready() const noexcept { return false; }

      void await_suspend(std::coroutine_handle<> handle) noexcept {
         request_data.handle = handle;
         io_uring_sqe_set_data(entry, &request_data);



         // SUBMITTING HERE LATER CAUSES ERRORS ==============================
         io_uring_submit(ring_);
         // ==================================================================



      }

      [[nodiscard]] int await_resume() const noexcept {
         return request_data.status_code;
      }
   };

   class uring_awaitable {
   public:
      template <typename F, typename... Args>
         requires requires(F f) { f(std::declval<io_uring_sqe*>(), std::declval<Args>()...); }
      uring_awaitable(io_uring* ring, io_result::operation_type op, F function, Args&&... args)
         : ring_(ring), sqe_(io_uring_get_sqe(ring_)), op_(op) {
         function(sqe_, std::forward<Args>(args)...);
      }

      auto operator co_await() const {
         return uring_awaiter{op_, ring_, sqe_};
      }

   private:
      io_uring* ring_;
      io_uring_sqe* sqe_;
      io_result::operation_type op_;
   };

   io_uring ring_{};
   bool interrupted_ = false;
   threadpool& pool_;
};

This class is meant to be used like this:

threadpool p{};
io_service s{128, p};

// In another thread, later
co_await s.accept_async(/* ... */);

The problem occurs when I put io_uring_submit in await_resume() as indicated in the above code snippet. Then I get the output "Error while doing an asynchronous request: 125 (Operation canceled)". However, if I change my message_pump() to something like this (and remove the submission from await_resume()):

void message_pump() {
   using namespace std::chrono_literals;

   io_uring_cqe* cqe = nullptr;

   while (true) {
      // SUBMITTING HERE ==================================================
      std::this_thread::sleep_for(1s);
      io_uring_submit(&ring_);
      // ==================================================================

      auto ret = io_uring_wait_cqe(&ring_, &cqe);
      auto* data = static_cast<io_result*>(io_uring_cqe_get_data(cqe));

      if (ret < 0) {
         std::cerr << "Fatal error in io_uring_wait_cqe!\n";
         throw std::runtime_error{"Fatal error in io_uring_wait_cqe!"};
      }

      if (cqe->res < 0) {
         std::cerr << "Error while doing an asynchronous request: " << -cqe->res << " (" << strerror(-cqe->res) << ")\n";
         throw std::runtime_error{"Error while doing an asynchronous request : " + std::string(strerror(-cqe->res))};
      }

      data->status_code = cqe->res;

      pool_.push_task([handle = data->handle] { handle.resume(); });
      io_uring_cqe_seen(&ring_, cqe);
   }
}

Now everything works as expected. Obviously this is not the proper way to do things.

Why is the first approach not working?


Solution

  • Ops that complete through a kernel task have to use the thread that called io_uring_submit(). That means that thread can't terminate before the cqe is completed in the kernel. You risk losing completions if you're submitting sqes from a dynamic thread pool.

    I'm not 100% certain accept uses a kernel task or that this case returns -ECANCEL but I had to switch to dedicated threads to submit for uring_cmds because of this.

    The recommendation from the liburing feature request "submit requests from any thread" is to have a single thread to submit or each thread has its own ring (scroll to the very bottom).