I want to process a vec of items concurrently. Basically, each item involves doing some I/O, but they are not dependant on one another. I could use futures::join_all
(or in my case futures::try_join_all
) to achieve that.
Since I don't care about processing results (except for the error in try_join_all
case), I don't really want a vec of units (Vec<()>
) in the end; It's just a useless allocation, where ()
(or Result<(), Error>
) would be sufficient.
futures
crate docs mentioned using FuturesUnordered
directly, if needed, so I gave it a try (playground):
use futures::{
stream::{FuturesUnordered, StreamExt},
Future,
};
fn main() {
tokio::spawn(async move {
let foos = [Foo, Foo, Foo];
join_all_discard(foos.iter().map(|foo| process_foo(foo))).await;
});
}
async fn process_foo(foo: &Foo) {
// do something async with foo
}
async fn join_all_discard<I>(iter: I) -> ()
where
I: IntoIterator,
I::Item: Future<Output = ()>,
{
let mut stream: FuturesUnordered<_> = iter.into_iter().collect();
while let Some(()) = stream.next().await {}
}
The error is
error: higher-ranked lifetime error
--> src/lib.rs:9:5
|
9 | / tokio::spawn(async move {
10 | | let foos = [Foo, Foo, Foo];
11 | | join_all_discard(foos.iter().map(|foo| process_foo(foo))).await;
12 | | });
| |______^
|
= note: could not prove `[async block@src/lib.rs:9:18: 12:6]: std::marker::Send`
Compiler error manifests itself only with the call to tokio::spawn
present (which kinda makes sense, since it possibly needs to send the future to a different thread)
Using join_all_discard(foos.iter().map(process_foo)).await
(without the closure) eliminates the error, as well as using futures::join_all
, yet my own implementation is flawed. I am lost. I suspect something has to do with the generic bounds on join_all_discard
.
P.S. To solve the real problem I wrote try_join_all_discard
, which exhibits the same error, and looks like this:
async fn try_join_all_discard<I, E>(iter: I) -> Result<(), E>
where
I: IntoIterator,
I::Item: Future<Output = Result<(), E>>,
{
let mut stream: FuturesUnordered<_> = iter.into_iter().collect();
loop {
match stream.next().await {
Some(Ok(())) => continue,
Some(Err(e)) => break Err(e),
None => break Ok(()),
}
}
}
Change the trait bounds as such
async fn join_all_discard<I, F>(iter: I) -> ()
where
I: IntoIterator<Item = F>,
F: Future<Output = ()>