Search code examples
multithreadingrustasync-awaitrust-tokio

Tokio tasks sharing a thread


I am surprised by the behaviour of this code. Note I am intentionally sleeping with std sleep in async code so there are no .await points.

use std::time::Duration;
use tokio::sync::mpsc;

async fn send(tx: mpsc::Sender<u32>) {
    loop {
        std::thread::sleep(Duration::from_secs(1));
        tx.try_send(42).unwrap();
        println!("sent");
    }
}

async fn recv(mut rx: mpsc::Receiver<u32>) {
    while let Some(_) = rx.recv().await {
        println!("recv");
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(4);
    tokio::spawn(send(tx.clone()));
    tokio::spawn(recv(rx));
    
    // let it run for a bit to see the channel fill up and send task to panic
    tokio::time::sleep(Duration::from_secs(100)).await;
}

What I'm surprised by is that the receiver doesn't get a chance to recv().await until the channel fills up causing the send task to panic.

The reason I'm surprised is I have more than 2 cores on my computer and I'm aware the Tokio runtime defaults to as many core threads as cpu cores, so are these two tasks not run on separate threads? This causes me to suspect the tasks are being run on one core thread.

Do all tasks share a core thread until the runtime decides a new one is needed? Does it spawn all at start up (n=num cpu cores) but still run tasks on one thread until more are required? Naively I assumed task 1 would get core thread 1, then task 2 would get core thread 2. (Obviously in this pathological example the programmer has violated the minimal requirement of async code: not to block.)


Solution

  • There is no guarantee whatsoever which thread a spawned task ends up on. And for relatively few task (like say 2) it's likely they'll end up on the same thread, you can check that by simply asking for the current threads id:

    std::thread::current().id()
    

    With that you can check wether task are being put on different threads:

    async fn send(tx: mpsc::Sender<u32>) {
        loop {
            std::thread::sleep(Duration::from_secs(1));
            tx.try_send(42).unwrap();
            println!("{:?} sent", std::thread::current().id());
        }
    }
    

    and an equivalent modification to recv gives me for example the following output on my machine:

    ThreadId(3) sent
    ThreadId(3) sent
    ThreadId(3) sent
    ThreadId(3) sent
    thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: "Full(..)"', src/m
    ain.rs:7:25
    note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
    ThreadId(3) recv
    ThreadId(3) recv
    ThreadId(3) recv
    ThreadId(3) recv
    

    So yes indeed the tasks are spawned onto the same os thread.

    You can read about why another thread isn't able to steal the recv task in the documentation on disable_lifo_slot you have to enable tokio_unstable to use it. Indeed the version with the slot disabled does distribute the tasks as you'd expect:

    fn main() {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .disable_lifo_slot()
            .enable_time()
            .build()
            .unwrap();
        rt.block_on(async {
            println!("{:?}", std::thread::current().id());
            let (tx, rx) = mpsc::channel(4);
            tokio::task::spawn(send(tx));
            tokio::spawn(recv(rx)).await.unwrap();
        });
    }
    
    ThreadId(5) sent
    ThreadId(4) recv
    ThreadId(5) sent
    ThreadId(3) recv
    ThreadId(5) sent
    ThreadId(4) recv
    ThreadId(5) sent
    …
    

    That's not usually a problem if like required all tasks cooperate (ie yield by using await at some time during their work) or you spawn the actually blocking and not asynchronous pieces of work using tokio::task::spawn_blocking