Search code examples
rustrust-tokio

Listen to all responses from a Vec of WebSocketStream(s) in tokio select


I have multiple streams of type WebSocketStream<MaybeTlsStream<TcpStream>> bundled together in a Vec. I want to .send() a message to a single stream (chosen at random) and listen for responses from all the streams.

I can loop over the vector, pick one at random to .send() the message but how can I listen for responses from all of the streams? The issue is in Some(_) = ws.streams.next() => {}.

use tokio; // 1.35.1
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tokio::TcpStream;

struct WS {
    streams: Vec<WebSocketStream<MaybeTlsStream<TcpStream>>>,
};

impl WS {
    async fn connect(...) -> Self {
        ...
        Self {
            streams
        }
    }
}

async fn send_msg(ws: &mut WS, msg: String) {
    let random_stream = ws.streams.iter_mut().choose_multiple(&mut rand::thread_rng(), 1)[0];
    random_stream.send(..).await.unwrap();
}

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
    let mut ws = WS::connect(..);
    
    loop {
        tokio::select! {
            Some(msg) = rx.recv() => {
                send_msg(ws, msg.to_string()).await;
            }
            // Cannot call .next() here as we need a future
            Some(_) = ws.streams.next() => {
                // How do we receive all messages from all streams here?
            }
        }
    }
}

Solution

  • If your WebSocketStream<MaybeTlsStream<TcpStream>> implement futures::stream::Stream you can use futures::stream::select_all::SelectAll instead of Vec to hold your streams. Then you can use it in place of all the streams in tokio::select!, you can access the individual streams with iter_mut() just like you do with the Vec.