Search code examples
asynchronousrustrust-tokiorust-futures

Could not prove that closure is Send


I want to process a vec of items concurrently. Basically, each item involves doing some I/O, but they are not dependant on one another. I could use futures::join_all (or in my case futures::try_join_all) to achieve that.

Since I don't care about processing results (except for the error in try_join_all case), I don't really want a vec of units (Vec<()>) in the end; It's just a useless allocation, where () (or Result<(), Error>) would be sufficient.

futures crate docs mentioned using FuturesUnordered directly, if needed, so I gave it a try (playground):

use futures::{
    stream::{FuturesUnordered, StreamExt},
    Future,
};

fn main() {
    tokio::spawn(async move {
        let foos = [Foo, Foo, Foo];
        join_all_discard(foos.iter().map(|foo| process_foo(foo))).await;
    });
}

async fn process_foo(foo: &Foo) {
    // do something async with foo
}

async fn join_all_discard<I>(iter: I) -> ()
where
    I: IntoIterator,
    I::Item: Future<Output = ()>,
{
    let mut stream: FuturesUnordered<_> = iter.into_iter().collect();
    while let Some(()) = stream.next().await {}
}

The error is

error: higher-ranked lifetime error
  --> src/lib.rs:9:5
   |
9  | /     tokio::spawn(async move {
10 | |         let foos = [Foo, Foo, Foo];
11 | |         join_all_discard(foos.iter().map(|foo| process_foo(foo))).await;
12 | |     });
   | |______^
   |
   = note: could not prove `[async block@src/lib.rs:9:18: 12:6]: std::marker::Send`

Compiler error manifests itself only with the call to tokio::spawn present (which kinda makes sense, since it possibly needs to send the future to a different thread)

Using join_all_discard(foos.iter().map(process_foo)).await (without the closure) eliminates the error, as well as using futures::join_all, yet my own implementation is flawed. I am lost. I suspect something has to do with the generic bounds on join_all_discard.

P.S. To solve the real problem I wrote try_join_all_discard, which exhibits the same error, and looks like this:

async fn try_join_all_discard<I, E>(iter: I) -> Result<(), E>
where
    I: IntoIterator,
    I::Item: Future<Output = Result<(), E>>,
{
    let mut stream: FuturesUnordered<_> = iter.into_iter().collect();
    loop {
        match stream.next().await {
            Some(Ok(())) => continue,
            Some(Err(e)) => break Err(e),
            None => break Ok(()),
        }
    }
}

Solution

  • Change the trait bounds as such

    async fn join_all_discard<I, F>(iter: I) -> ()
    where
        I: IntoIterator<Item = F>,
        F: Future<Output = ()>