Search code examples
asynchronousrustrust-tokio

How to make a tokio stream works concurrently


I would like to run a sequence of streams with several processes of the stream running concurrently. The test code that I tried is the following:

use tokio;
use tokio_stream::{self as stream, StreamExt};
use std::time::Duration;

#[tokio::main]
async fn main() {
    let stream = stream::iter(0..10)
        .then(|i| async move {
            tokio::time::sleep(Duration::from_secs(1)).await;
            println!("i={:?}", i);
        })
        .chunks_timeout(3, Duration::from_secs(20));

    let _result : Vec<_> = stream
        .collect().await;
}

This code runs but it prints the 10 values one by one with a 1 second delay. This is the opposite of concurrency. What I would expect is that one wait 3 seconds, get 3 numbers printed, then wait 1 second and so on. I think the tokio::time::sleep is fine as with a join_all I got the code working concurrently.

So what explains the lack of concurrency and what be done about it?


Solution

  • Streams are simply asynchronous iterators and are lazy, they do not include additional per-element concurrency unless you ask for it.

    The streaming functionality from tokio-stream is simply a different flavor of that from the futures crate (which is probably more common). They both use the same underlying Stream trait. The differences lie in their respective StreamExt traits; the one from from the futures crate is more fully featured and generic while the the one from tokio-stream is more slim and geared towards tokio (and timing in particular).

    You'll want to use .buffered()/buffered_unordered() from the futures crate since those methods allow the stream to buffer multiple pending futures at a time (thereby allowing them to run concurrently), but you'll still need .chunks_timeout() from tokio-stream. Unfortunately, you can't have both traits in scope since many calls like .then() or .map() would be ambiguous; the best course would be to follow the advice from tokio-stream and use the fully-qualified syntax for one of them.

    Here's a full example (changed from .then() to .map() due to the way .buffered() works):

    use futures::stream::{self, StreamExt};
    use std::time::Duration;
    use tokio;
    
    #[tokio::main]
    async fn main() {
        let stream = stream::iter(0..10)
            .map(|i| async move {
                tokio::time::sleep(Duration::from_secs(1)).await;
                println!("i={:?}", i);
            })
            .buffered(5);
    
        let stream = tokio_stream::StreamExt::chunks_timeout(stream, 3, Duration::from_secs(20));
    
        let _result: Vec<_> = stream.collect().await;
    }
    

    This code would complete in approximately two seconds since it will run five out of the ten sleeps concurrently at a time.