I have a Vec<tokio::sync::broadcast::Receiver<String>>
(basically a vector of channel receivers). I want to subscribe to all of them and get messages from all of them. How can I do it?
broadcast::Receiver
is not a stream yet, it's just an object that has a recv()
function. To combine multiple of them, you have to convert them to streams first.
Luckily, there is the tokio-streams
crate for just that.
Once the receivers are converted to streams, you can use futures::stream::select_all
to combine them:
use futures::stream::select_all;
use futures::StreamExt;
use tokio::time::{sleep, Duration};
use tokio_stream::wrappers::BroadcastStream;
#[tokio::main]
async fn main() {
let (sender1, receiver1) = tokio::sync::broadcast::channel(5);
let (sender2, receiver2) = tokio::sync::broadcast::channel(5);
let receivers = vec![receiver1, receiver2];
// Send on all channels
tokio::spawn(async move {
for i in 0..5 {
sleep(Duration::from_millis(50)).await;
sender1.send(format!("A{i}")).unwrap();
sleep(Duration::from_millis(50)).await;
sender2.send(format!("B{i}")).unwrap();
}
});
// Receive on all channels simultaneously
let mut fused_streams = select_all(receivers.into_iter().map(BroadcastStream::new));
while let Some(value) = fused_streams.next().await {
println!("Got value: {}", value.unwrap());
}
}
Got value: A0
Got value: B0
Got value: A1
Got value: B1
Got value: A2
Got value: B2
Got value: A3
Got value: B3
Got value: A4
Got value: B4