I have this minimal example:
use std::{future::Future, pin::Pin, thread::JoinHandle, fmt::Debug};
use tokio::runtime::Runtime;
struct Callback<E> {
f: Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), E>> + Send + Sync>> + Send + Sync>,
}
trait Provider {
fn setup(&self) -> JoinHandle<()>;
}
enum Foo {
A,
B
}
trait IntoFoo {
fn into_foo(&self) -> Foo;
}
impl<E: Debug + IntoFoo> Provider for Callback<E> {
fn setup(&self) -> JoinHandle<()> {
std::thread::spawn(move || {
// Running async function sycnhronously within another thread.
let rt = Runtime::new().unwrap();
rt.block_on(handle(Box::new(move || (self.f)())))
.expect("request loop failed")
})
}
}
async fn handle<E: Debug + IntoFoo + 'static>( callback_fn: Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), E>> + Send + Sync>> + Send + Sync>) -> Result<(), E> {
perform(Box::new(move || (callback_fn)())).await
}
pub async fn perform<
E: Debug + IntoFoo>(
op: Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), E>> + Send + Sync>> + Send + Sync>,
) -> Result<(), E> {
(op)().await
}
This is a dumbed down version of some real code where I essentially have to pass around an async callback, inside a struct. This callback is passed through multiple functions. One of them calls the function in a newly spawned thread.
The error I get is inside the thread spawning code, when handle
is called.
The error is:
error: lifetime may not live long enough
--> src/indexer/callback.rs:41:41
|
27 | fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
| - let's call the lifetime of this reference `'1`
...
41 | rt.block_on(handle_event(input, Box::new(move |ev: &Event| (self.f)(ev)), &retry_policy, utils))
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cast requires that `'1` must outlive `'static`
How should I orchestrate this? Open to changing the struct field type and anything similar. I must note however: this function must be able to be called multiple times (it might be inside a loop in handle
).
A few other threads suggested async callbacks to be passed in a Box, and their results be a pinned boxed trait object. Which is why I tried this system.
You cannot use a non-'static
&self
reference in a new thread.
One option would be to use Arc
instead of Box
, and clone it:
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, thread::JoinHandle};
use tokio::runtime::Runtime;
struct Callback<E> {
f: Arc<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), E>> + Send + Sync>> + Send + Sync>,
}
trait Provider {
fn setup(&self) -> JoinHandle<()>;
}
enum Foo {
A,
B,
}
trait IntoFoo {
fn into_foo(&self) -> Foo;
}
impl<E: Debug + IntoFoo + 'static> Provider for Callback<E> {
fn setup(&self) -> JoinHandle<()> {
let f = Arc::clone(&self.f);
std::thread::spawn(move || {
// Running async function sycnhronously within another thread.
let rt = Runtime::new().unwrap();
rt.block_on(handle(f)).expect("request loop failed")
})
}
}
async fn handle<E: Debug + IntoFoo + 'static>(
callback_fn: Arc<
dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), E>> + Send + Sync>> + Send + Sync,
>,
) -> Result<(), E> {
perform(callback_fn).await
}
pub async fn perform<E: Debug + IntoFoo>(
op: Arc<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), E>> + Send + Sync>> + Send + Sync>,
) -> Result<(), E> {
op().await
}
You can use Arc
in less places and use Box
instead (if you have an existing API that you need to use) by wrapping the callback in its own function and box that.