Search code examples
rustudprust-tokio

Select! between Interval and UdpFramed next() blocks


I am attempting to have a tokio::select! loop where I want an Interval to tick() every second and listen for Udp messages to come in on a UdpFramed Stream

When there are no messages, the Interval ticks just fine, but when a message is received, it seems like the loop is blocking on f.next() but I don't understand why.

Shouldn't next() call poll_next() on the Stream and only wait for the next item if it is available? And thus shouldn't it skip this select! arm and just keep on ticking?

use futures::StreamExt;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use std::io;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::select;
use tokio::time::interval;

use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;

//MULTICAST Constants
const IP_ANY: [u8; 4] = [0, 0, 0, 0];

#[tokio::main]
async fn main() -> io::Result<()> {
    pretty_env_logger::init();
    info!("Tokio Select Example");
//Create a udp ip4 socket
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;

//Allow this port to be reused by other sockets
socket.set_reuse_address(true)?;
socket.set_reuse_port(true)?;

//Create IPV4 any adress
let address = SocketAddrV4::new(IP_ANY.into(), 5353);

println!("Created Address");

//Bind to wildcard 0.0.0.0
socket.bind(&SockAddr::from(address))?;

println!("Bound Socket");

//Join multicast group
socket.join_multicast_v4(&Ipv4Addr::new(224, 0, 0, 251), address.ip())?;

println!("Joined Multicast");

//Convert to std::net udp socket
let udp_std_socket: std::net::UdpSocket = socket.into();

//Convert to tokio udp socket
let udp_socket = UdpSocket::from_std(udp_std_socket)?;

println!(
    "Created a UDP Socket at {}, {}",
    address.ip().to_string(),
    address.port().to_string()
);

let mut f = UdpFramed::new(udp_socket, BytesCodec::new());
let mut interval = interval(Duration::from_secs(1));

loop {
    select! {

        result = tokio::time::timeout(Duration::from_millis(200), f.next()) => {
            println!("{:?}", result);
        }
        default = interval.tick() => {
            println!("Tick!");
        }
        }
    }
}

Solution

  • Quote from the documentation of UdpSocket::from_std():

    This function is intended to be used to wrap a UDP socket from the standard library in the Tokio equivalent. The conversion assumes nothing about the underlying socket; it is left up to the user to set it in non-blocking mode.

    You are not setting the underlying socket in non-blocking mode.

    This works:

    use futures::StreamExt;
    use socket2::{Domain, Protocol, SockAddr, Socket, Type};
    use std::io;
    use std::net::SocketAddrV4;
    use std::time::Duration;
    use tokio::net::UdpSocket;
    use tokio::select;
    use tokio::time::interval;
    
    use tokio_util::codec::BytesCodec;
    use tokio_util::udp::UdpFramed;
    
    //MULTICAST Constants
    const IP_ANY: [u8; 4] = [0, 0, 0, 0];
    
    #[tokio::main]
    async fn main() -> io::Result<()> {
        println!("Tokio Select Example");
        //Create a udp ip4 socket
        let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
    
        //Allow this port to be reused by other sockets
        socket.set_reuse_address(true)?;
        socket.set_reuse_port(true)?;
        socket.set_nonblocking(true)?;
    
        //Create IPV4 any adress
        let address = SocketAddrV4::new(IP_ANY.into(), 15253);
        println!("Created Address");
    
        //Bind to wildcard 0.0.0.0
        socket.bind(&SockAddr::from(address))?;
        println!("Bound Socket");
    
        //Convert to tokio udp socket
        let udp_socket = UdpSocket::from_std(socket.into())?;
    
        println!(
            "Created a UDP Socket at {}, {}",
            address.ip().to_string(),
            address.port().to_string()
        );
    
        let mut f = UdpFramed::new(udp_socket, BytesCodec::new());
        let mut interval = interval(Duration::from_secs(1));
    
        loop {
            println!("A");
            select! {
                result = tokio::time::timeout(Duration::from_millis(200), f.next()) => {
                    println!("{:?}", result);
                }
                _ = interval.tick() => {
                    println!("Tick!");
                }
            }
            println!("Z");
        }
    }