Search code examples
rustasync-awaitrust-tokio

Rust futures select in loop is not working as expected


I have a loop that receives UDP messages, to break this loop I used a channel that receives shutdown event with the select! macro. I expect to break the loop whenever I send the shutdown event (Ctrl+C). But it's not working as expected after receiving the first UDP message the shutdown is not being selected and not breaking the loop.

Here is the code and I will add the steps to see the same problem.

// [dependencies]
// tokio = { version = "1.23", features = ["full"] }
// tokio-stream = { version = "0.1" , features = ["sync"]}
// socket2 = "0.4"
// stream-cancel = "0.8"
// async-stream = "0.3"
// futures = "0.3"

use std::env::args;
use std::error::Error;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;

use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use tokio::net::UdpSocket;
use tokio::sync::broadcast;
use tokio::sync::broadcast::{Receiver, Sender};

const DEFAULT_PORT: u16 = 10020;
const DEFAULT_MULTICAST: &str = "224.0.1.1";

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mode = args().nth(1).unwrap_or_else(|| "server".to_string());

    let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
    shutdown_watcher(shutdown_tx);

    return if mode == "server" {
        run_server(shutdown_rx).await
    } else {
        run_client().await
    };
}

async fn run_server(rx: Receiver<()>) -> Result<(), Box<dyn Error>> {
    let multicast = create_multicast()?;
    let multicast = UdpSocket::from_std(multicast)?;
    let multicast = Arc::new(multicast);
    println!("Start Discovery Server");
    start_discovery_server(multicast, rx).await;

    Ok(())
}

async fn run_client() -> Result<(), Box<dyn Error>> {
    let multicast = create_multicast()?;
    let multicast = UdpSocket::from_std(multicast)?;
    let multicast = Arc::new(multicast);
    let msg = "My message".to_string();
    let addr = SocketAddrV4::new(DEFAULT_MULTICAST.parse::<Ipv4Addr>()?, DEFAULT_PORT);
    let len = multicast.send_to(msg.as_bytes(), &addr).await?;
    println!("Client Sent {len} bytes.");
    Ok(())
}

fn create_multicast() -> Result<std::net::UdpSocket, Box<dyn Error>> {
    let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap();
    socket.set_reuse_address(true)?;
    let multiaddr = Ipv4Addr::new(224, 0, 1, 1);
    socket.set_multicast_loop_v4(true)?;
    let addr = SocketAddr::new("0.0.0.0".parse()?, DEFAULT_PORT);
    socket.bind(&SockAddr::from(addr))?;
    let interface = socket2::InterfaceIndexOrAddress::Index(0);
    socket.join_multicast_v4_n(&multiaddr, &interface)?;
    Ok(socket.into())
}

async fn read_new_message(udp: Arc<UdpSocket>) -> String {
    let mut buf = vec![0u8; 1024];
    let result = udp.recv_from(&mut buf).await;
    match result {
        Ok((len, _addr)) => {
            let msg = String::from_utf8_lossy(&buf[..len]);
            msg.to_string()
        }
        Err(_) => "Error!".to_string(),
    }
}

async fn start_discovery_server(udp: Arc<UdpSocket>, mut shutdown: Receiver<()>) {
    loop {
        tokio::select! {
            msg = read_new_message(udp.clone()) => {
                println!("Got: {msg:?}");
            }
            res = shutdown.recv() => {
                println!("Got {res:?} for shutdown");
                break
            }
            else => {
                println!("Both channels closed");
                break
            }
        }
        println!("loop");
    }
}

fn shutdown_watcher(tx: Sender<()>) {
    tokio::spawn(async move {
        println!("watcher started");
        let _ = tokio::signal::ctrl_c().await;
        let r = tx.send(());
        println!("got ctrl+C {r:?}");
    });
}

To see the problem first run the cargo run then in another terminal run cargo run --release -- c now press Ctrl+C in the first terminal the server should stop but it's not working. call the cargo run --release -- c again in second terminal and now the program stops in first terminal.

Can you help me to find the problem that prevents the (Ctrl+C) event from stopping the code immediately?


Solution

  • By default, sockets are blocking. And therefore, recv_from() blocks and the select! does not work.

    You should call socket.set_nonblocking(true) to mark the socket as non blocking so the I/O will be asynchronous.