Below is code that does exactly what I want but fails on type checking. I’m guessing that it involves Pin
, Box
, and dyn
.
Is this possible?
use async_stream::stream;
use futures::stream::{FuturesUnordered, StreamExt};
pub fn map<U, V, W>(
f: impl Fn(&U) -> W,
items: Vec<U>,
) -> impl futures::Stream<Item = V>
where
V: Send,
W: futures::Future<Output = V> + Send,
{
stream! {
let mut futures = FuturesUnordered::new();
let mut i = 2;
if 2 <= items.len() {
futures.push(tokio::spawn(f(&items[0])));
futures.push(tokio::spawn(f(&items[1])));
while let Some(result) = futures.next().await {
let y = result.unwrap();
yield y;
futures.push(tokio::spawn(f(&items[i])));
i += 1
}
}
}
}
#[tokio::main]
async fn main() {
async fn f(x: &u32) -> u32 {
x + 1
}
let input = vec![1, 2, 3];
let output = map(f, input);
futures::pin_mut!(output);
while let Some(x) = output.next().await {
println!("{:?}", x);
}
}
The exact error is:
error[E0308]: mismatched types
--> src/main.rs:40:18
|
40 | let output = map(f, input);
| ^^^ lifetime mismatch
|
= note: expected associated type `<for<'_> fn(&u32) -> impl futures::Future<Output = u32> {f} as FnOnce<(&u32,)>>::Output`
found associated type `<for<'_> fn(&u32) -> impl futures::Future<Output = u32> {f} as FnOnce<(&u32,)>>::Output`
= note: the required lifetime does not necessarily outlive the empty lifetime
I have tried many variants of changing lifetimes, adding annotations of variations on Box
and Pin
, and applying compiler suggestions. Rather than hacking, I'm trying to understand whether it is even possible to write code that performs this logic.
My mental model has been that Rust cannot be convinced of the safety of references, so some object needs to be boxed in order to give Rust constant (pinned) access to it. I've been trying to figure out which object(s).
As requested by @Chayim Friedman, one of the exact solutions I tried was to set the signature of map
to the following.
pub fn map<U, V>(
f: impl Fn(&U) -> Pin<Box<dyn Future<Output = V> + '_>>,
items: Vec<U>,
) -> impl futures::Stream<Item = V>
where
V: Send,
The problem was that rust still complains about tokio::spawn
not being sendable:
error[E0277]: `dyn futures::Future<Output = V>` cannot be sent between threads safely
--> src/main.rs:21:26
|
21 | futures.push(tokio::spawn(f(&items[0])));
| ^^^^^^^^^^^^ `dyn futures::Future<Output = V>` cannot be sent between threads safely
|
= help: the trait `std::marker::Send` is not implemented for `dyn futures::Future<Output = V>`
= note: required because of the requirements on the impl of `std::marker::Send` for `Unique<dyn futures::Future<Output = V>>`
= note: required because it appears within the type `Box<dyn futures::Future<Output = V>>`
= note: required because it appears within the type `Pin<Box<dyn futures::Future<Output = V>>>`
I don't think you need Pin
, Box
, or even references, for that matter.
You own the Vec
, so just take elements out of it via move. That transfers the ownership of the elements to the f
callback, which makes ownership management a lot easier.
use async_stream::stream;
use futures::stream::{FuturesUnordered, StreamExt};
pub fn map<U, W>(f: impl Fn(U) -> W, items: Vec<U>) -> impl futures::Stream<Item = W::Output>
where
W: futures::Future + Send + 'static,
W::Output: Send,
{
stream! {
// Convert into a fused iterator. Fused iterators
// are guaranteed to return `None` continuously after
// their last item.
let mut iter = items.into_iter().fuse();
let mut futures = FuturesUnordered::new();
if let Some(el) = iter.next() {
futures.push(tokio::spawn(f(el)));
}
if let Some(el) = iter.next() {
futures.push(tokio::spawn(f(el)));
}
while let Some(result) = futures.next().await {
let y = result.unwrap();
yield y;
if let Some(el) = iter.next() {
futures.push(tokio::spawn(f(el)));
}
}
}
}
#[tokio::main]
async fn main() {
async fn f(x: u32) -> u32 {
x + 1
}
let input = vec![1, 2, 3];
let output = map(f, input);
futures::pin_mut!(output);
while let Some(x) = output.next().await {
println!("{:?}", x);
}
}
2
3
4
I had to rewrite parts of your code because your version didn't have a stop criterium. It would always just crash by running out-of-bounds on the array access.