I have a loop that receives UDP messages, to break this loop I used a channel that receives shutdown event with the select!
macro. I expect to break the loop whenever I send the shutdown event (Ctrl+C). But it's not working as expected after receiving the first UDP message the shutdown is not being selected and not breaking the loop.
Here is the code and I will add the steps to see the same problem.
// [dependencies]
// tokio = { version = "1.23", features = ["full"] }
// tokio-stream = { version = "0.1" , features = ["sync"]}
// socket2 = "0.4"
// stream-cancel = "0.8"
// async-stream = "0.3"
// futures = "0.3"
use std::env::args;
use std::error::Error;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use tokio::net::UdpSocket;
use tokio::sync::broadcast;
use tokio::sync::broadcast::{Receiver, Sender};
const DEFAULT_PORT: u16 = 10020;
const DEFAULT_MULTICAST: &str = "224.0.1.1";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mode = args().nth(1).unwrap_or_else(|| "server".to_string());
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
shutdown_watcher(shutdown_tx);
return if mode == "server" {
run_server(shutdown_rx).await
} else {
run_client().await
};
}
async fn run_server(rx: Receiver<()>) -> Result<(), Box<dyn Error>> {
let multicast = create_multicast()?;
let multicast = UdpSocket::from_std(multicast)?;
let multicast = Arc::new(multicast);
println!("Start Discovery Server");
start_discovery_server(multicast, rx).await;
Ok(())
}
async fn run_client() -> Result<(), Box<dyn Error>> {
let multicast = create_multicast()?;
let multicast = UdpSocket::from_std(multicast)?;
let multicast = Arc::new(multicast);
let msg = "My message".to_string();
let addr = SocketAddrV4::new(DEFAULT_MULTICAST.parse::<Ipv4Addr>()?, DEFAULT_PORT);
let len = multicast.send_to(msg.as_bytes(), &addr).await?;
println!("Client Sent {len} bytes.");
Ok(())
}
fn create_multicast() -> Result<std::net::UdpSocket, Box<dyn Error>> {
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap();
socket.set_reuse_address(true)?;
let multiaddr = Ipv4Addr::new(224, 0, 1, 1);
socket.set_multicast_loop_v4(true)?;
let addr = SocketAddr::new("0.0.0.0".parse()?, DEFAULT_PORT);
socket.bind(&SockAddr::from(addr))?;
let interface = socket2::InterfaceIndexOrAddress::Index(0);
socket.join_multicast_v4_n(&multiaddr, &interface)?;
Ok(socket.into())
}
async fn read_new_message(udp: Arc<UdpSocket>) -> String {
let mut buf = vec![0u8; 1024];
let result = udp.recv_from(&mut buf).await;
match result {
Ok((len, _addr)) => {
let msg = String::from_utf8_lossy(&buf[..len]);
msg.to_string()
}
Err(_) => "Error!".to_string(),
}
}
async fn start_discovery_server(udp: Arc<UdpSocket>, mut shutdown: Receiver<()>) {
loop {
tokio::select! {
msg = read_new_message(udp.clone()) => {
println!("Got: {msg:?}");
}
res = shutdown.recv() => {
println!("Got {res:?} for shutdown");
break
}
else => {
println!("Both channels closed");
break
}
}
println!("loop");
}
}
fn shutdown_watcher(tx: Sender<()>) {
tokio::spawn(async move {
println!("watcher started");
let _ = tokio::signal::ctrl_c().await;
let r = tx.send(());
println!("got ctrl+C {r:?}");
});
}
To see the problem first run the cargo run
then in another terminal run cargo run --release -- c
now press Ctrl+C in the first terminal the server should stop but it's not working. call the cargo run --release -- c
again in second terminal and now the program stops in first terminal.
Can you help me to find the problem that prevents the (Ctrl+C) event from stopping the code immediately?
By default, sockets are blocking. And therefore, recv_from()
blocks and the select!
does not work.
You should call socket.set_nonblocking(true)
to mark the socket as non blocking so the I/O will be asynchronous.