Search code examples
rustrust-tokio

Passing async function to tokio::spawn from another thread


Challenge:

Main thread is sync and there's additional thread which runs Tokio Runtime.

I need to pass an async function from the sync thread to tokio thread.

Code:

use std::{future::Future, thread, thread::JoinHandle};
use tokio::{
    sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}
};

type Job = Box<dyn (FnOnce() -> dyn Future<Output = bool>) + Send>;


pub fn thread_init(receiver: UnboundedReceiver<Job>) {
    let handler = thread::spawn(move || {
            thread_main(receiver);
    });
}

pub fn main() {
   let (sender, receiver) = unbounded_channel::<Job>();
   thread_init(receiver);
}

#[tokio::main(worker_threads = 1)]
async fn thread_main(mut receive: UnboundedReceiver<Job>) {
    loop {
        match receive.try_recv() {
            Err(e) => match e {
                tokio::sync::mpsc::error::TryRecvError::Disconnected => {}
                tokio::sync::mpsc::error::TryRecvError::Empty => {
                    yield_now().await;
                }
            },
            Ok(job) => {
                tokio::spawn(job());
            }
        }
    }
}

I get these errors:

`dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send` is not a future
the trait `std::future::Future` is not implemented for `dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send`
dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send must be a future or must implement `IntoFuture` to be awaited
required for `Box<dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send>` to implement `std::future::Future`
`dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send` cannot be unpinned
consider using `Box::pin`
required for `Box<dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send>` to implement `std::future::Future`

Note: this is not the actual use case, but a simplification of it.

Question: how can I correctly pass an async function to tokio?


Solution

  • tokio::spawn() takes a future, not a function. You need to call this function and give the returned future to tokio::spawn():

    tokio::spawn(job());
    // Add parens   ^^
    

    Once you do this, you'll receive three errors, each of which shows up after solving the last one:

    • You can't invoke a function that returns an unsized type (dyn Future in this case). Box the return type to solve this.
    • The future given to tokio::spawn() must be Unpin. Wrap the box in a Pin to solve this.
    • The future given to tokio::spawn() must also be Send. Add the Send constraint to dyn Future.

    Fixing these three issues, the Job type becomes:

    type Job = Box<dyn (FnOnce() -> Pin<Box<dyn Future<Output = bool> + Send>>) + Send>;
    

    This is a bit of a mouthful. Because the general type Pin<Box<dyn Future + Send>> comes up so frequently, the futures crate provides a handy generic type alias BoxFuture to make expressing these types less verbose. Using this type alias, we can simplify the definition of Job to:

    type Job = Box<dyn (FnOnce() -> BoxFuture<'static, bool>) + Send>;
    

    Having said all of this, there isn't really a convincing reason I see to have Job be a boxed function. Why can't it just be the boxed future? You don't really gain much by adding the FnOnce layer. You can just do:

    type Job = BoxFuture<'static, bool>;
    

    If you do this, you don't need to add the parens -- tokio:spawn(job) will work just fine since job is already a future.