I am surprised by the behaviour of this code. Note I am intentionally sleeping with std
sleep in async code so there are no .await
points.
use std::time::Duration;
use tokio::sync::mpsc;
async fn send(tx: mpsc::Sender<u32>) {
loop {
std::thread::sleep(Duration::from_secs(1));
tx.try_send(42).unwrap();
println!("sent");
}
}
async fn recv(mut rx: mpsc::Receiver<u32>) {
while let Some(_) = rx.recv().await {
println!("recv");
}
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(4);
tokio::spawn(send(tx.clone()));
tokio::spawn(recv(rx));
// let it run for a bit to see the channel fill up and send task to panic
tokio::time::sleep(Duration::from_secs(100)).await;
}
What I'm surprised by is that the receiver doesn't get a chance to recv().await
until the channel fills up causing the send task to panic.
The reason I'm surprised is I have more than 2 cores on my computer and I'm aware the Tokio runtime defaults to as many core threads as cpu cores, so are these two tasks not run on separate threads? This causes me to suspect the tasks are being run on one core thread.
Do all tasks share a core thread until the runtime decides a new one is needed? Does it spawn all at start up (n=num cpu cores) but still run tasks on one thread until more are required? Naively I assumed task 1 would get core thread 1, then task 2 would get core thread 2. (Obviously in this pathological example the programmer has violated the minimal requirement of async code: not to block.)
There is no guarantee whatsoever which thread a spawned task ends up on. And for relatively few task (like say 2) it's likely they'll end up on the same thread, you can check that by simply asking for the current threads id:
std::thread::current().id()
With that you can check wether task are being put on different threads:
async fn send(tx: mpsc::Sender<u32>) {
loop {
std::thread::sleep(Duration::from_secs(1));
tx.try_send(42).unwrap();
println!("{:?} sent", std::thread::current().id());
}
}
and an equivalent modification to recv
gives me for example the following output on my machine:
ThreadId(3) sent
ThreadId(3) sent
ThreadId(3) sent
ThreadId(3) sent
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: "Full(..)"', src/m
ain.rs:7:25
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
ThreadId(3) recv
ThreadId(3) recv
ThreadId(3) recv
ThreadId(3) recv
So yes indeed the tasks are spawned onto the same os thread.
You can read about why another thread isn't able to steal the recv
task in the documentation on disable_lifo_slot
you have to enable tokio_unstable
to use it.
Indeed the version with the slot disabled does distribute the tasks as you'd expect:
fn main() {
let rt = tokio::runtime::Builder::new_multi_thread()
.disable_lifo_slot()
.enable_time()
.build()
.unwrap();
rt.block_on(async {
println!("{:?}", std::thread::current().id());
let (tx, rx) = mpsc::channel(4);
tokio::task::spawn(send(tx));
tokio::spawn(recv(rx)).await.unwrap();
});
}
ThreadId(5) sent
ThreadId(4) recv
ThreadId(5) sent
ThreadId(3) recv
ThreadId(5) sent
ThreadId(4) recv
ThreadId(5) sent
…
That's not usually a problem if like required all tasks cooperate (ie yield by using await
at some time during their work) or you spawn the actually blocking and not asynchronous pieces of work using tokio::task::spawn_blocking