What is the proper way to structure a Rust program for the following common use case?
I have a process that queries servers for data. It's complicated: it queries several servers and interleaves streams from them. The total amount of data is too big for memory. I'd like to represent the result of that process as some kind of data structure that can be await-ed every time there is a new token from the possibly interleaved streams.
What is the proper Rust/tokio data structure to use?
As an example, in Python/JavaScript this could be an asynchronous generator.
fn query_data(...) -> ? {
// query servers
}
let stream = query_data(...);
async for token in stream {
// process token
}
You could use StreamMap
from the tokio-stream
extension crate:
use std::time::Duration;
use tokio::time::sleep;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt, StreamMap};
fn create_stream(name: &'static str, delay_millis: u64) -> impl Stream<Item = String> {
let (sender, receiver) = tokio::sync::mpsc::channel(10);
tokio::spawn(async move {
for i in 0..5 {
sleep(Duration::from_millis(delay_millis)).await;
sender.send(format!("{}-{}", name, i)).await.unwrap();
}
});
let receiver_stream: ReceiverStream<String> = receiver.into();
receiver_stream
}
fn combine_streams<T>(stream_a: T, stream_b: T) -> impl Stream<Item = (&'static str, T::Item)>
where
T: Stream + Unpin,
{
let mut map = StreamMap::new();
map.insert("a", stream_a);
map.insert("b", stream_b);
map
}
#[tokio::main]
async fn main() {
let stream_a = create_stream("A", 200);
let stream_b = create_stream("B", 277);
let mut combined = combine_streams(stream_a, stream_b);
while let Some((origin, packet)) = combined.next().await {
println!("Received from {origin}: {packet}");
}
}
Received from a: A-0
Received from b: B-0
Received from a: A-1
Received from b: B-1
Received from a: A-2
Received from a: A-3
Received from b: B-2
Received from a: A-4
Received from b: B-3
Received from b: B-4