Search code examples
rustasync-awaitudprust-tokio

Why does `tokio::main` report the error "cycle detected when processing"?


I'm using Tokio and async/.await to create an UDP server where I can receive and send data in an asynchronous way.

The SendHalf of my UDP socket is shared across more than one task. To do this I’m using Arc<Mutex<SendHalf>>. That's why Arc<Mutex<_>> exists.

use tokio::net::UdpSocket;
use tokio::net::udp::SendHalf;
use tokio::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::net::SocketAddr;

struct Packet {
    sender: Arc<Mutex<SendHalf>>,
    buf: [u8; 512],
    addr: SocketAddr,
}

#[tokio::main]
async fn main() {
    let server = UdpSocket::bind(("0.0.0.0", 44667)).await.unwrap();
    let (mut server_rx, mut server_tx) = server.split();
    let sender = Arc::new(Mutex::new(server_tx));
    let (mut tx, mut rx) = mpsc::channel(100);

    tokio::spawn(async move {
        loop {
            let mut buffer = [0; 512];
            let (_, src) = server_rx.recv_from(&mut buffer).await.unwrap();
            let packet = Packet {
                sender: sender.clone(),
                buf: buffer,
                addr: src,
            };
            tx.send(packet).await;
        }
    });

    while let Some(packet) = rx.recv().await {
        tokio::spawn(async move {
            let mut socket = packet.sender.lock().unwrap();
            socket.send_to(&packet.buf, &packet.addr).await.unwrap();
        });
    }
}

Here is also a Playground.

I'm facing a compiler error that I don't understand:

error[E0391]: cycle detected when processing `main`
  --> src/main.rs:13:1
   |
13 | #[tokio::main]
   | ^^^^^^^^^^^^^^
   |
note: ...which requires processing `main::{{closure}}#0::{{closure}}#1`...
  --> src/main.rs:34:33
   |
34 |           tokio::spawn(async move {
   |  _________________________________^
35 | |             let mut socket = packet.sender.lock().unwrap();
36 | |             socket.send_to(&packet.buf, &packet.addr).await.unwrap();
37 | |         });
   | |_________^
   = note: ...which again requires processing `main`, completing the cycle
note: cycle used when processing `main::{{closure}}#0`
  --> src/main.rs:13:1
   |
13 | #[tokio::main]
   | ^^^^^^^^^^^^^^

Why is my code producing a cycle? Why does the call require processing main?

What does the error mean in more detail? I want to understand whats going on.


Solution

  • According to tokio documentation, when it comes to using !Send value from a task:

    Holding on to a !Send value across calls to .await will result in an unfriendly compile error message similar to:

    [... some type ...] cannot be sent between threads safely

    or:

    error[E0391]: cycle detected when processing main

    You are witnessing this exact error. When you lock a Mutex:

    pub fn lock(&self) -> LockResult<MutexGuard<T>>
    

    it returns a MutexGuard, which is !Send:

    impl<'_, T: ?Sized> !Send for MutexGuard<'_, T>
    

    This compiles fine:

    #[tokio::main]
    async fn main() {
        ...
    
        while let Some(packet) = rx.recv().await {
            let mut socket = packet.sender.lock().unwrap();
            socket.send_to(&packet.buf, &packet.addr).await.unwrap();
        }
    }