I have multiple streams of type WebSocketStream<MaybeTlsStream<TcpStream>>
bundled together in a Vec
. I want to .send()
a message to a single stream (chosen at random) and listen for responses from all the streams.
I can loop over the vector, pick one at random to .send()
the message but how can I listen for responses from all of the streams? The issue is in Some(_) = ws.streams.next() => {}
.
use tokio; // 1.35.1
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tokio::TcpStream;
struct WS {
streams: Vec<WebSocketStream<MaybeTlsStream<TcpStream>>>,
};
impl WS {
async fn connect(...) -> Self {
...
Self {
streams
}
}
}
async fn send_msg(ws: &mut WS, msg: String) {
let random_stream = ws.streams.iter_mut().choose_multiple(&mut rand::thread_rng(), 1)[0];
random_stream.send(..).await.unwrap();
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let mut ws = WS::connect(..);
loop {
tokio::select! {
Some(msg) = rx.recv() => {
send_msg(ws, msg.to_string()).await;
}
// Cannot call .next() here as we need a future
Some(_) = ws.streams.next() => {
// How do we receive all messages from all streams here?
}
}
}
}
If your WebSocketStream<MaybeTlsStream<TcpStream>>
implement futures::stream::Stream
you can use futures::stream::select_all::SelectAll
instead of Vec
to hold your streams. Then you can use it in place of all the streams in tokio::select!
, you can access the individual streams with iter_mut()
just like you do with the Vec
.