I am attempting to have a tokio::select!
loop where I want an Interval
to tick()
every second and listen for Udp messages to come in on a UdpFramed
Stream
When there are no messages, the Interval
ticks just fine, but when a message is received, it seems like the loop is blocking on f.next() but I don't understand why.
Shouldn't next()
call poll_next()
on the Stream
and only wait for the next item if it is available? And thus shouldn't it skip this select!
arm and just keep on ticking?
use futures::StreamExt;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use std::io;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::select;
use tokio::time::interval;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
//MULTICAST Constants
const IP_ANY: [u8; 4] = [0, 0, 0, 0];
#[tokio::main]
async fn main() -> io::Result<()> {
pretty_env_logger::init();
info!("Tokio Select Example");
//Create a udp ip4 socket
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
//Allow this port to be reused by other sockets
socket.set_reuse_address(true)?;
socket.set_reuse_port(true)?;
//Create IPV4 any adress
let address = SocketAddrV4::new(IP_ANY.into(), 5353);
println!("Created Address");
//Bind to wildcard 0.0.0.0
socket.bind(&SockAddr::from(address))?;
println!("Bound Socket");
//Join multicast group
socket.join_multicast_v4(&Ipv4Addr::new(224, 0, 0, 251), address.ip())?;
println!("Joined Multicast");
//Convert to std::net udp socket
let udp_std_socket: std::net::UdpSocket = socket.into();
//Convert to tokio udp socket
let udp_socket = UdpSocket::from_std(udp_std_socket)?;
println!(
"Created a UDP Socket at {}, {}",
address.ip().to_string(),
address.port().to_string()
);
let mut f = UdpFramed::new(udp_socket, BytesCodec::new());
let mut interval = interval(Duration::from_secs(1));
loop {
select! {
result = tokio::time::timeout(Duration::from_millis(200), f.next()) => {
println!("{:?}", result);
}
default = interval.tick() => {
println!("Tick!");
}
}
}
}
Quote from the documentation of UdpSocket::from_std()
:
This function is intended to be used to wrap a UDP socket from the standard library in the Tokio equivalent. The conversion assumes nothing about the underlying socket; it is left up to the user to set it in non-blocking mode.
You are not setting the underlying socket in non-blocking mode.
This works:
use futures::StreamExt;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use std::io;
use std::net::SocketAddrV4;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::select;
use tokio::time::interval;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
//MULTICAST Constants
const IP_ANY: [u8; 4] = [0, 0, 0, 0];
#[tokio::main]
async fn main() -> io::Result<()> {
println!("Tokio Select Example");
//Create a udp ip4 socket
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
//Allow this port to be reused by other sockets
socket.set_reuse_address(true)?;
socket.set_reuse_port(true)?;
socket.set_nonblocking(true)?;
//Create IPV4 any adress
let address = SocketAddrV4::new(IP_ANY.into(), 15253);
println!("Created Address");
//Bind to wildcard 0.0.0.0
socket.bind(&SockAddr::from(address))?;
println!("Bound Socket");
//Convert to tokio udp socket
let udp_socket = UdpSocket::from_std(socket.into())?;
println!(
"Created a UDP Socket at {}, {}",
address.ip().to_string(),
address.port().to_string()
);
let mut f = UdpFramed::new(udp_socket, BytesCodec::new());
let mut interval = interval(Duration::from_secs(1));
loop {
println!("A");
select! {
result = tokio::time::timeout(Duration::from_millis(200), f.next()) => {
println!("{:?}", result);
}
_ = interval.tick() => {
println!("Tick!");
}
}
println!("Z");
}
}