I would like to run a sequence of streams with several processes of the stream running concurrently. The test code that I tried is the following:
use tokio;
use tokio_stream::{self as stream, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
let stream = stream::iter(0..10)
.then(|i| async move {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("i={:?}", i);
})
.chunks_timeout(3, Duration::from_secs(20));
let _result : Vec<_> = stream
.collect().await;
}
This code runs but it prints the 10 values one by one with a 1 second delay. This is the opposite of concurrency. What I would expect is that one wait 3 seconds, get 3 numbers printed, then wait 1 second and so on.
I think the tokio::time::sleep
is fine as with a join_all
I got the code working concurrently.
So what explains the lack of concurrency and what be done about it?
Streams are simply asynchronous iterators and are lazy, they do not include additional per-element concurrency unless you ask for it.
The streaming functionality from tokio-stream is simply a different flavor of that from the futures crate (which is probably more common). They both use the same underlying Stream
trait. The differences lie in their respective StreamExt
traits; the one from from the futures crate is more fully featured and generic while the the one from tokio-stream is more slim and geared towards tokio (and timing in particular).
You'll want to use .buffered()
/buffered_unordered()
from the futures crate since those methods allow the stream to buffer multiple pending futures at a time (thereby allowing them to run concurrently), but you'll still need .chunks_timeout()
from tokio-stream. Unfortunately, you can't have both traits in scope since many calls like .then()
or .map()
would be ambiguous; the best course would be to follow the advice from tokio-stream and use the fully-qualified syntax for one of them.
Here's a full example (changed from .then()
to .map()
due to the way .buffered()
works):
use futures::stream::{self, StreamExt};
use std::time::Duration;
use tokio;
#[tokio::main]
async fn main() {
let stream = stream::iter(0..10)
.map(|i| async move {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("i={:?}", i);
})
.buffered(5);
let stream = tokio_stream::StreamExt::chunks_timeout(stream, 3, Duration::from_secs(20));
let _result: Vec<_> = stream.collect().await;
}
This code would complete in approximately two seconds since it will run five out of the ten sleeps concurrently at a time.