Search code examples
asynchronousrusttimerrust-tokio

How can I create a Tokio timer to debounce reception of network packets?


Problem

I have implemented a function that return an array of missing indexes from packets sent. If 100 packets are sent, then the server will have a vector of indexes with 0 (missing), and 1 (not missing). I don't want to trigger this every time, only when there is a slight delay where no packet is received. I want to change my synchronous function to an asynchronous debouncing function

My attempt to solve debouncing issue

I am looking for a solution to implement a timer (like 300ms) that will have its value constantly overwritten by different threads. Once its value is no longer overwritten, it should trigger a block of code or function. I am using Tokio.

This is pseudo code of what I want to achieve:

// thanks https://stackoverflow.com/questions/26593387/how-can-i-get-the-current-time-in-milliseconds
fn get_epoch() -> u128 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_millis()
}

impl Server {
    async fn run(self) -> Result<(), io::Error> {
        let Server {
            socket,
            mut buf,
            mut to_send,
        } = self;

        let mut timer_delay = get_epoch();

        loop {
            if let Some((size, peer)) = to_send {
                timer_delay = get_epoch(); // "reset" the to a closer value
            }

            futures::join!(
                /* execute a block of code if true*/
                if get_epoch() - timer_delay > 300,
                /* else (default case):*/
                to_send = Some(socket.recv_from(&mut buf)
            );
        }
    }
}

I based my project on the following example from Tokio:

impl Server {
    async fn run(self) -> Result<(), io::Error> {
        let Server {
            socket,
            mut buf,
            mut to_send,
        } = self;

        loop {
            // First we check to see if there's a message we need to echo back.
            // If so then we try to send it back to the original source, waiting
            // until it's writable and we're able to do so.
            if let Some((size, peer)) = to_send {
                let amt = socket.send_to(&buf[..size], &peer).await?;

                println!("Echoed {}/{} bytes to {}", amt, size, peer);
            }

            // If we're here then `to_send` is `None`, so we take a look for the
            // next message we're going to echo back.
            to_send = Some(socket.recv_from(&mut buf).await?);
        }
    }
}

Solution

  • Spawn another Tokio task for debouncing that will listen to a channel. You can tell when the channel hasn't received anything in a while by using a timeout. When the timeout occurs, that's the signal that you should perform your infrequent action. Don't forget to perform that action when the channel closes as well:

    use std::time::Duration;
    use tokio::{sync::mpsc, task, time}; // 1.3.0
    
    #[tokio::main]
    async fn main() {
        let (debounce_tx, mut debounce_rx) = mpsc::channel(10);
        let (network_tx, mut network_rx) = mpsc::channel(10);
    
        // Listen for events
        let debouncer = task::spawn(async move {
            let duration = Duration::from_millis(10);
    
            loop {
                match time::timeout(duration, debounce_rx.recv()).await {
                    Ok(Some(())) => {
                        eprintln!("Network activity")
                    }
                    Ok(None) => {
                        eprintln!("Debounce finished");
                        break;
                    }
                    Err(_) => {
                        eprintln!("{:?} since network activity", duration)
                    }
                }
            }
        });
    
        // Listen for network activity
        let server = task::spawn({
            let debounce_tx = debounce_tx.clone();
            async move {
                while let Some(packet) = network_rx.recv().await {
                    // Received a packet
                    debounce_tx
                        .send(())
                        .await
                        .expect("Unable to talk to debounce");
                    eprintln!("Received a packet: {:?}", packet);
                }
            }
        });
    
        // Prevent deadlocks
        drop(debounce_tx);
    
        // Drive the network input
        network_tx.send(1).await.expect("Unable to talk to network");
        network_tx.send(2).await.expect("Unable to talk to network");
        network_tx.send(3).await.expect("Unable to talk to network");
    
        time::sleep(Duration::from_millis(20)).await;
    
        network_tx.send(4).await.expect("Unable to talk to network");
        network_tx.send(5).await.expect("Unable to talk to network");
        network_tx.send(6).await.expect("Unable to talk to network");
    
        time::sleep(Duration::from_millis(20)).await;
    
        // Close the network
        drop(network_tx);
    
        // Wait for everything to finish
        server.await.expect("Server panicked");
        debouncer.await.expect("Debouncer panicked");
    }
    
    Received a packet: 1
    Received a packet: 2
    Received a packet: 3
    Network activity
    Network activity
    Network activity
    10ms since network activity
    10ms since network activity
    Received a packet: 4
    Received a packet: 5
    Received a packet: 6
    Network activity
    Network activity
    Network activity
    10ms since network activity
    Debounce finished