Search code examples
asynchronousrustrust-tokiocrossbeam

Why does tokio::spawn have a delay when called next to crossbeam_channel::select?


I'm creating a task which will spawn other tasks. Some of them will take some time, so they cannot be awaited, but they can run in parallel:

src/main.rs

use crossbeam::crossbeam_channel::{bounded, select};

#[tokio::main]
async fn main() {
    let (s, r) = bounded::<usize>(1);

    tokio::spawn(async move {
        let mut counter = 0;
        loop {
            let loop_id = counter.clone();
            tokio::spawn(async move { // why this one was not fired?
                println!("inner task {}", loop_id);
            }); // .await.unwrap(); - solves issue, but this is long task which cannot be awaited
            println!("loop {}", loop_id);
            select! {
                recv(r) -> rr => {
                    // match rr {
                    //     Ok(ee) => {
                    //         println!("received from channel {}", loop_id);
                    //         tokio::spawn(async move {
                    //             println!("received from channel task {}", loop_id);
                    //         });
                    //     },
                    //     Err(e) => println!("{}", e),
                    // };
                },
                // more recv(some_channel) -> 
            }
            counter = counter + 1;
        }
    });

    // let s_clone = s.clone();
    // tokio::spawn(async move {
    //     s_clone.send(2).unwrap();
    // });

    loop {
        // rest of the program
    }
}

I've noticed strange behavior. This outputs:

loop 0

I was expecting it to also output inner task 0.

If I send a value to channel, the output will be:

loop 0
inner task 0
loop 1

This is missing inner task 1.

Why is inner task spawned with one loop of delay?

The first time I noticed such behavior with 'received from channel task' delayed one loop, but when I reduced code to prepare sample this started to happen with 'inner task'. It might be worth mentioning that if I write second tokio::spawn right to another, only the last one will have this issue. Is there something I should be aware when calling tokio::spawn and select!? What causes this one loop of delay?

Cargo.toml dependencies

[dependencies]
tokio = { version = "0.2", features = ["full"] }
crossbeam = "0.7"

Rust 1.46, Windows 10


Solution

  • select! is blocking, and the docs for tokio::spawn say:

    The spawned task may execute on the current thread, or it may be sent to a different thread to be executed.

    In this case, the select! "future" is actually a blocking function, and spawn doesn't use a new thread (either in the first invocation or the one inside the loop). Because you don't tell tokio that you are going to block, tokio doesn't think another thread is needed (from tokio's perspective, you only have 3 futures which should never block, so why would you need another thread anyway?).

    The solution is to use the tokio::task::spawn_blocking for the select!-ing closure (which will no longer be a future, so async move {} is now move || {}). Now tokio will know that this function actually blocks, and will move it to another thread (while keeping all the actual futures in other execution threads).

    use crossbeam::crossbeam_channel::{bounded, select};
    
    #[tokio::main]
    async fn main() {
        let (s, r) = bounded::<usize>(1);
    
        tokio::task::spawn_blocking(move || {
            // ...
        });
    
        loop {
            // rest of the program
        }
    }
    

    Link to playground

    Another possible solution is to use a non-blocking channel like tokio::sync::mpsc, on which you can use await and get the expected behavior, like this playground example with direct recv().await or with tokio::select!, like this:

    use tokio::sync::mpsc;
    
    #[tokio::main]
    async fn main() {
        let (mut s, mut r) = mpsc::channel::<usize>(1);
    
        tokio::spawn(async move {
            loop {
                // ...
                tokio::select! {
                    Some(i) = r.recv() => {
                        println!("got = {}", i);
                    }
                }
            }
        });
    
        loop {
            // rest of the program
        }
    }
    

    Link to playground