Search code examples
rustwebsocketrust-tokio

Create multiple connections to a websocket in tokio-tungstenite using multiple local IPs


I'm retrieving all the local IP addresses and I want to create websocket connections from each of these IPs.

How do I cycle through all ips and create multiple streams?

use std::net::{IpAddr, Ipv4Addr};
use tokio_tungstenite::connect_async;

#[tokio::main]
async fn main() {

    let ips: Vec<IpAddr> = vec![
        IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)),
        IpAddr::V4(Ipv4Addr::new(2, 2, 2, 2))
    ];

    // How do I cycle through all `ips` and create multiple `stream`s?
    let (mut stream, _) = connect_async("wss://echo.websocket.org").await.unwrap();

    loop {
        tokio::select! {
            Some(msg) = stream.next() => {
                println!("{}", msg);
            }
        }
    }
}

Solution

  • To be able to determine your local address you have to manually create the TcpStream and switch to client_async instead of connect_async:

    use futures::{stream::SelectAll, StreamExt};
    use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
    use tokio::net::{lookup_host, TcpSocket};
    use tokio_tungstenite::client_async;
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        // we set the ports here to 0 so the OS can pick one for us
        let ips: Vec<SocketAddr> = vec![
            SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(1, 1, 1, 1), 0)),
            SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(2, 2, 2, 2), 0)),
        ];
        let mut streams = SelectAll::new();
        // get the first remote address you might want to try them one after the other or maybe `zip` them with the ones from `ips`, …
        let target = lookup_host("echo.websocket.org:443")
            .await?
            .next()
            .ok_or("no IP for \"echo.websocket.org:443\"")?;
        for ip in ips {
            let socket = TcpSocket::new_v4()?;
            socket.bind(ip)?;
            let tcp_stream = socket.connect(target).await?;
    
            let (ws_stream, _) = client_async("wss://echo.websocket.org", tcp_stream)
                .await?;
            streams.push(ws_stream);
        }
    
        loop {
            tokio::select! {
                Some(msg) = streams.next() => {
                    println!("{:?}", msg);
                }
            }
        }
    }
    

    I'm using SelectAll to join all the streams into one here, you might want to replace it with a Vec to have easy access to all the different streams individually. Or you could use SelectAll::iter(_mut)