Search code examples

Why does my Tokio server listening for UDP packets use 100% CPU until its OOM killed?

I am listening to udp packets. Once the first packet arrives, I start to listen for more packets. If one packet is received, then the server will start to run fullspeed (even though there should be a delay), and eventually gets killed:

use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::{sync::mpsc, task, time}; // 1.4.0
use std::env;
use std::sync::Arc;

const UDP_HEADER: usize = 8;
const IP_HEADER: usize = 20;
const AG_HEADER: usize = 4;
const MAX_DATA_LENGTH: usize = (64 * 1024 - 1) - UDP_HEADER - IP_HEADER;
const MAX_DATAGRAM_SIZE: usize = 0x10000;
const ADDRESS: &str = "";
const ADDRESS_CLIENT: &str = "";

async fn main() {

async fn server() {
    eprintln!("Starting the server");
    let addr = env::args().nth(1).unwrap_or_else(|| ADDRESS.to_string());
    let socket = UdpSocket::bind(&addr).await.unwrap();
    let arc = Arc::new(socket);
    let mut buf = [0u8; MAX_DATA_LENGTH];
    let (debounce_tx, mut debounce_rx) = mpsc::channel::<Vec<u8>>(MAX_DATAGRAM_SIZE);

    let _debouncer = task::spawn(async move {
        let mut _packet_ids: Vec<i32> = Vec::new();
        _packet_ids = vec![0; 10];
        let duration = Duration::from_millis(1300);

        loop {
            match time::timeout(duration, debounce_rx.recv()).await {
                Ok(Some(bytes)) => {
                    let id: u8 = bytes.clone()[0];
                    _packet_ids[id as usize] = 1;
                    eprintln!("{} id packet received:{:?}", id, _packet_ids);
                    if _packet_ids.iter().all(|x| x == &1i32) {
                        println!("All packets have been received, stop program ");
                     //   break;
                Ok(None) => {
                    eprintln!("Done: {:?}", _packet_ids);
                Err(_) => {
                    eprintln!("No activity for 1.3sd");
    // Listen for first packet
    let result = arc.clone().recv_from(&mut buf).await;
    match result {
        Ok((len, addr)) => {
            eprintln!("Bytes len: {} from {}", len, addr);
                .expect("Unable to talk to debounce");
        Err(_) => {
            eprintln!("Couldnt get datagram");
    // listen for other packets
    loop {
        let thread_socket = arc.clone();
        let _server = task::spawn({
            let debounce_tx = debounce_tx.clone();

            async move {
                while let result = thread_socket.recv_from(&mut buf).await {
                    match result {
                        Ok((len, addr)) => {
                            eprintln!("Bytes len: {} from {}", len, addr);
                                .expect("Unable to talk to debounce");
                        Err(_) => {
                            eprintln!("Couldnt get datagram");
                 // Prevent deadlocks
async fn client() {
    eprintln!("Starting the client");

    let remote_addr: SocketAddr = env::args()
        .unwrap_or_else(|| ADDRESS.into()) // cargo run --example udp-client --

    // We use port 0 to let the operating system allocate an available port for us.
    let local_addr: SocketAddr = if remote_addr.is_ipv4() {
        ADDRESS_CLIENT // "" //
    } else {
    let socket = UdpSocket::bind(ADDRESS_CLIENT).await.unwrap();


    socket.send(&[0, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[1, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[2, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[3, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[4, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[5, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[6, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[7, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[8, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[9, 2, 3]).await.expect("Unable to talk to network"); // stop when n1 = 0

    eprintln!("Client done");

I'm sending 10 packets from the client where first byte range from 0 to 9. I am just trying to make it work for now, I am aware of the buffer overflow. Here's the server output:

Starting the server
Bytes len: 3 from
0 id packet received:[1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Bytes len: 3 from
1 id packet received:[1, 1, 0, 0, 0, 0, 0, 0, 0, 0]
Bytes len: 3 from
Bytes len: 3 from
Bytes len: 3 from
Bytes len: 3 from
Bytes len: 3 from
Bytes len: 3 from
2 id packet received:[1, 1, 1, 0, 0, 0, 0, 0, 0, 0]
3 id packet received:[1, 1, 1, 1, 0, 0, 0, 0, 0, 0]
4 id packet received:[1, 1, 1, 1, 1, 0, 0, 0, 0, 0]
5 id packet received:[1, 1, 1, 1, 1, 1, 0, 0, 0, 0]
6 id packet received:[1, 1, 1, 1, 1, 1, 1, 0, 0, 0]
7 id packet received:[1, 1, 1, 1, 1, 1, 1, 1, 0, 0]
Bytes len: 3 from
8 id packet received:[1, 1, 1, 1, 1, 1, 1, 1, 1, 0]
No activity for 1.3sd
No activity for 1.3sd
Bytes len: 3 from
9 id packet received:[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
All packets have been received, stop program 
No activity for 1.3sd
No activity for 1.3sd
No activity for 1.3sd

How do I keep the server running smoothly without being killed?


  • Solved it thanks to @Frxstrem and @transistor's comments, see code's comment for the change:

     loop {
            let thread_socket = arc.clone();
            let debounce_tx = debounce_tx.clone(); // moved up
           /* let _server = task::spawn({
                async move { */
                    if let result = thread_socket.recv_from(&mut buf).await { // previously while
                        match result {
                            Ok((len, addr)) => {
                                eprintln!("Bytes len: {} from {}", len, addr);
                                    .expect("Unable to talk to debounce");
                            Err(_) => {
                                eprintln!("Couldnt get datagram");
                     // Prevent deadlocks
             /*   }
            }); */