Search code examples
asynchronousrustasync-awaitrust-tokio

Is there any way to create a async stream generator that yields the result of repeatedly calling a function?


I want to build a program that collects weather updates and represents them as a stream. I want to call get_weather() in an infinite loop, with 60 seconds delay between finish and start.

A simplified version would look like this:

async fn get_weather() -> Weather { /* ... */ }

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    loop {
        tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
        let weather = get_weather().await;
        yield weather; // This is not supported
        // Note: waiting for get_weather() stops the timer and avoids overflows.
    }
}

Is there any way to do this easily?

Using tokio::timer::Interval will not work when get_weather() takes more than 60 seconds:

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
        .then(|| get_weather())
}

If that happens, the next function will start immediately. I want to keep exactly 60 seconds between the previous get_weather() start and the next get_weather() start.


Solution

  • Use stream::unfold to go from the "world of futures" to the "world of streams". We don't need any extra state, so we use the empty tuple:

    use futures::StreamExt; // 0.3.4
    use std::time::Duration;
    use tokio::time; // 0.2.11
    
    struct Weather;
    
    async fn get_weather() -> Weather {
        Weather
    }
    
    const BETWEEN: Duration = Duration::from_secs(1);
    
    fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
        futures::stream::unfold((), |_| async {
            time::delay_for(BETWEEN).await;
            let weather = get_weather().await;
            Some((weather, ()))
        })
    }
    
    #[tokio::main]
    async fn main() {
        get_weather_stream()
            .take(3)
            .for_each(|_v| async {
                println!("Got the weather");
            })
            .await;
    }
    
    % time ./target/debug/example
    
    Got the weather
    Got the weather
    Got the weather
    
    real    3.085   3085495us
    user    0.004   3928us
    sys     0.003   3151us
    

    See also: