Search code examples
rustrust-tokio

Cannot use filter_map with buffer_unordered when concurrently executing tasks?


I'm looking at this example for concurrently downloading things in Rust.

Roughly, it looks like this:

#![feature(async_closure)]

use futures::{stream, StreamExt}; // 0.3.13

async fn foo() {
    let xs_new = stream::once(async { 42 })
        .map(async move |x| {
            Some(x + 1)
        })
        .buffer_unordered(42);
}

However, I was hoping to use filter_map to do something like this:

#![feature(async_closure)]

use futures::{stream, StreamExt}; // 0.3.13

async fn foo() {
    let xs_new = stream::once(async { 42 })
        .filter_map(async move |x| if x % 2 == 0 { Some(x + 1) } else { None })
        .buffer_unordered(42);
}

However this fails with the error: "{integer} is not a Future the trait ... is not implemented for {integer}".

Does anyone know why filter_map fails but map works fine?


Solution

  • buffer_unordered requires the Items of the Stream to themselves be Futures. Using map with an async closure works because it transforms the integers to Futures yielding an integer.

    Using filter_map requires returning a Future that yields an Option determining whether to filter or not. However, you've forgotten to transform the integers into Futures by making it return Some(Future):

    async fn foo() {
        let xs_new = stream::once(async { 42 })
            .filter_map(async move |x| {
                if x % 2 == 0 {
                    Some(async move { x + 1 }) // <--------
                } else {
                    None
                }
            })
            .buffer_unordered(42);
    }