Challenge:
Main thread is sync and there's additional thread which runs Tokio Runtime.
I need to pass an async function from the sync thread to tokio thread.
Code:
use std::{future::Future, thread, thread::JoinHandle};
use tokio::{
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}
};
type Job = Box<dyn (FnOnce() -> dyn Future<Output = bool>) + Send>;
pub fn thread_init(receiver: UnboundedReceiver<Job>) {
let handler = thread::spawn(move || {
thread_main(receiver);
});
}
pub fn main() {
let (sender, receiver) = unbounded_channel::<Job>();
thread_init(receiver);
}
#[tokio::main(worker_threads = 1)]
async fn thread_main(mut receive: UnboundedReceiver<Job>) {
loop {
match receive.try_recv() {
Err(e) => match e {
tokio::sync::mpsc::error::TryRecvError::Disconnected => {}
tokio::sync::mpsc::error::TryRecvError::Empty => {
yield_now().await;
}
},
Ok(job) => {
tokio::spawn(job());
}
}
}
}
I get these errors:
`dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send` is not a future
the trait `std::future::Future` is not implemented for `dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send`
dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send must be a future or must implement `IntoFuture` to be awaited
required for `Box<dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send>` to implement `std::future::Future`
`dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send` cannot be unpinned
consider using `Box::pin`
required for `Box<dyn FnOnce() -> (dyn std::future::Future<Output = bool> + 'static) + std::marker::Send>` to implement `std::future::Future`
Note: this is not the actual use case, but a simplification of it.
Question: how can I correctly pass an async function to tokio?
tokio::spawn()
takes a future, not a function. You need to call this function and give the returned future to tokio::spawn()
:
tokio::spawn(job());
// Add parens ^^
Once you do this, you'll receive three errors, each of which shows up after solving the last one:
dyn Future
in this case). Box the return type to solve this.tokio::spawn()
must be Unpin
. Wrap the box in a Pin
to solve this.tokio::spawn()
must also be Send
. Add the Send
constraint to dyn Future
.Fixing these three issues, the Job
type becomes:
type Job = Box<dyn (FnOnce() -> Pin<Box<dyn Future<Output = bool> + Send>>) + Send>;
This is a bit of a mouthful. Because the general type Pin<Box<dyn Future + Send>>
comes up so frequently, the futures
crate provides a handy generic type alias BoxFuture
to make expressing these types less verbose. Using this type alias, we can simplify the definition of Job
to:
type Job = Box<dyn (FnOnce() -> BoxFuture<'static, bool>) + Send>;
Having said all of this, there isn't really a convincing reason I see to have Job
be a boxed function. Why can't it just be the boxed future? You don't really gain much by adding the FnOnce
layer. You can just do:
type Job = BoxFuture<'static, bool>;
If you do this, you don't need to add the parens -- tokio:spawn(job)
will work just fine since job
is already a future.