Search code examples
c++c++20c++-coroutine

Switching between threads with C++20 coroutines


There is an example of switching to a different thread with C++20 coroutines:

#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>

auto switch_to_new_thread(std::jthread& out) {
    struct awaitable {
        std::jthread* p_out;
        bool await_ready() { return false; }
        void await_suspend(std::coroutine_handle<> h) {
            std::jthread& out = *p_out;
            if (out.joinable())
                throw std::runtime_error("Output jthread parameter not empty");
            out = std::jthread([h] { h.resume(); });
            // Potential undefined behavior: accessing potentially destroyed *this
            // std::cout << "New thread ID: " << p_out->get_id() << '\n';
            std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
        }
        void await_resume() {}
    };
    return awaitable{ &out };
}

struct task {
    struct promise_type {
        task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() {}
    };
};

task resuming_on_new_thread(std::jthread& out) {
    std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
    co_await switch_to_new_thread(out);
    // awaiter destroyed here
    std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}

int main() {
    std::jthread out;
    resuming_on_new_thread(out);
}

the coroutine starts on the main thread and switches to a newly created thread.

What is the right way to make it switch back to the main thread?

So the code below

task resuming_on_new_thread(std::jthread& out) {
    std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
    co_await switch_to_new_thread(out);
    // awaiter destroyed here
    std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
    co_await switch_to_main_thread();
    std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}

would print

Coroutine started on thread: 139972277602112
New thread ID: 139972267284224
Coroutine resumed on thread: 139972267284224
Coroutine resumed on thread: 139972277602112

Solution

  • switch_to_new_thread actually creates a new thread, it doesn't switch to a new thread. It then injects code that resumes the coroutine in it.

    To run code on a specific thread, you have to actually run code on that thread. To resume a coroutine, that specific thread has to run code that resume that coroutine.

    Here you did it by creating a brand-new thread and injecting code that does a resume.


    A traditional way to do stuff like this is with a message pump. The thread you want to participate has a message pump and a queue of events. It runs the events in order.

    To make a specific thread run some code, you send a message to that queue of events with the instructions (maybe the actual code, maybe just a value) in it.

    To this end, such an "event consuming thread" is more than a std::jthread or std::thread; it is a thread safe queue and some in the thread popping tasks off it an executing them.

    In such a system, you'd move between threads by sending messages.

    So you'd have a queue:

    template<class T>
    struct threadsafe_queue {
      [[nodiscard]] std::optional<T> pop();
      [[nodiscard]] std::deque<T> pop_many(std::optional<std::size_t> count = {}); // defaults to all
      [[nodiscard]] bool push(T);
      template<class C, class D>
      [[nodiscard]] std::optional<T> wait_until_pop(std::chrono::time_point<C,D>);
      void abort();
      [[nodiscard]] bool is_aborted() const { return aborted; }
    private:
      mutable std::mutex m;
      std::condition_variable cv;
      std::deque<T> queue;
      bool aborted = false;
      auto lock() const { return std::unique_lock(m); }
    };
    

    of tasks:

    using task_queue = threadsafe_queue<std::function<void()>>;
    

    a basic message pump is:

    void message_pump( task_queue& q ) {
      while (auto f = q.pop()) {
        if (*f) (*f)();
      }
    }
    

    you'd then make two task_queues, one for your main thread and one for your worker thread. To switch to worker instead of creating a new jthread you'd:

    workerq.push( [=]{ h.resume(); } );
    

    and similarly to switch to the main

    mainq.push( [=]{ h.resume(); } );
    

    there are lots of details I have skipped over, but this is a sketch of how you'd do it.