Search code examples
rustchannelactix-web

actix minimal channel example


I'm trying to replicate a simple benchmark for async-channel in actix-web. However, actix seems never to call the recv function. How can this be?

Does the await in the sending thread never cause tokio to yield to the other thread? I thought await yielded the current context?

#[actix_web::main]
async fn main() {
    let (s, r) = async_channel::unbounded();

    let t1 = tokio::spawn(async move {
        let mut now = chrono::Utc::now();
        while let Ok(x) = r.recv().await {
            if x % 1 == 0 {
                let next_now = chrono::Utc::now();
                println!("{} {:?}", x, next_now - now);
                now = next_now;
            }
        }
    });

    let t2 = tokio::spawn(async move {
        for i in 1.. {
            s.send(i).await.unwrap();
        }
    });

    join!(t1, t2).await;
}

I tried taking each of the code blocks outside the spawn calls, and replacing "join!" with select and other futures calls (also getting rid of it entirely).

I'm also trying to figure out whether it's OK to use actix-web with tokio (specifically, tokio::spawn within actix-web and actix::rt::spawn within tokio::main).


Solution

  • for i in 1.. {
        s.send(i).await.unwrap();
    }
    

    This loop never yields. Yes, there is an await in there, but send() only actually pauses if the channel is full. As you created an unbounded() channel, it will run out of memory before it is considered full. And without yielding, the other task will never be executed.

    Try changing your channel to a bounded one, or try to sleep in the loop:

    let t2 = tokio::spawn(async move {
        for i in 1.. {
            s.send(i).await.unwrap();
            tokio::time::sleep(Duration::from_millis(500)).await;
        }
    });
    

    If I had to choose, I'd make the channel bounded. Having an infinite generator combined with an unbounded channel is always a bad idea.