Search code examples
rustrust-tokio

rust receive messages from multiple channels at once


I have a Vec<tokio::sync::broadcast::Receiver<String>> (basically a vector of channel receivers). I want to subscribe to all of them and get messages from all of them. How can I do it?


Solution

  • broadcast::Receiver is not a stream yet, it's just an object that has a recv() function. To combine multiple of them, you have to convert them to streams first.

    Luckily, there is the tokio-streams crate for just that.

    Once the receivers are converted to streams, you can use futures::stream::select_all to combine them:

    use futures::stream::select_all;
    use futures::StreamExt;
    use tokio::time::{sleep, Duration};
    use tokio_stream::wrappers::BroadcastStream;
    
    #[tokio::main]
    async fn main() {
        let (sender1, receiver1) = tokio::sync::broadcast::channel(5);
        let (sender2, receiver2) = tokio::sync::broadcast::channel(5);
    
        let receivers = vec![receiver1, receiver2];
    
        // Send on all channels
        tokio::spawn(async move {
            for i in 0..5 {
                sleep(Duration::from_millis(50)).await;
                sender1.send(format!("A{i}")).unwrap();
                sleep(Duration::from_millis(50)).await;
                sender2.send(format!("B{i}")).unwrap();
            }
        });
    
        // Receive on all channels simultaneously
        let mut fused_streams = select_all(receivers.into_iter().map(BroadcastStream::new));
        while let Some(value) = fused_streams.next().await {
            println!("Got value: {}", value.unwrap());
        }
    }
    
    Got value: A0
    Got value: B0
    Got value: A1
    Got value: B1
    Got value: A2
    Got value: B2
    Got value: A3
    Got value: B3
    Got value: A4
    Got value: B4