I found a phenomenon when learning Rust tokio multithreading, I don't know why。
Here is the code.
use std::{thread, time::Duration};
use chrono::Local;
fn now() -> String {
Local::now().format("%F %T").to_string()
}
async fn async_task(i: u64) {
thread::sleep(Duration::from_secs(i));
println!("{}.async task {}!", now(), i);
}
#[tokio::main]
async fn main() {
for i in 0..10 {
tokio::spawn(async_task(i));
}
println!("{}.main thread", now());
}
I ran the code and found that each 10 async tasks were executed. Result was live below
2023-09-05 22:08:05.async task 0!
2023-09-05 22:08:05.main thread
2023-09-05 22:08:06.async task 1!
2023-09-05 22:08:07.async task 2!
2023-09-05 22:08:08.async task 3!
2023-09-05 22:08:09.async task 4!
2023-09-05 22:08:10.async task 5!
2023-09-05 22:08:11.async task 6!
2023-09-05 22:08:12.async task 7!
2023-09-05 22:08:13.async task 8!
2023-09-05 22:08:14.async task 9!
When I comment out the println
code in the main
thread, only a few of the tasks will be executed.
#[tokio::main]
async fn main() {
for i in 0..10 {
tokio::spawn(async_task(i));
}
// println!("{}.main thread", now());
}
Result was live below
2023-09-05 22:10:51.async task 0!
2023-09-05 22:10:52.async task 1!
I've tried this many times, and every time this difference occurs. On every attempt, code that does not comment out println
executes all async tasks, and the other one does not.
I really don't understand why a println
could make such a big difference. I'd appreciate it if anyone could help.
The real reason your code behaves weirdly is because you use std::thread::sleep
instead of tokio::time::sleep
.
It's important when working with async functions that you never block. Async reactors are non-preemptive, meaning they can only schedule between tasks at .await
points. That means, if you std::thread::sleep
, you block the entire program. That's also why in your working program, the output doesn't always get printed in the correct order, although the timing should be an entire second between each print.
If you replace std::thread::sleep
with tokio::time::sleep
, you will get a consistent behaviour (although it is probably not the behaviour you desire):
use std::time::Duration;
use chrono::Local;
fn now() -> String {
Local::now().format("%F %T").to_string()
}
async fn async_task(i: u64) {
tokio::time::sleep(Duration::from_secs(i)).await;
println!("{}.async task {}!", now(), i);
}
#[tokio::main]
async fn main() {
for i in 0..10 {
tokio::spawn(async_task(i));
}
println!("{}.main thread", now());
tokio::time::sleep(Duration::from_millis(2500)).await;
}
2023-09-05 16:39:25.main thread
2023-09-05 16:39:25.async task 0!
2023-09-05 16:39:26.async task 1!
2023-09-05 16:39:27.async task 2!
Why so? Because when main
finishes, tokio cancels all leftover tasks. But again, tokio can only schedule at .await
points, so in your 'working' example, tokio still tries to cancel the tasks, but isn't successful because they never hit an .await
point. So it 'seems' like tokio is waiting for them, but that's actually not true.
The simplest way to solve this is to not have the main
function end. (actually this is the only way that comes to my mind)
There are multiple ways to achieve that. In your case, it seems to me that your intention is to wait for all subtasks to finish, and then to end the program. This can be done by awaiting the join handles:
use std::time::Duration;
use chrono::Local;
fn now() -> String {
Local::now().format("%F %T").to_string()
}
async fn async_task(i: u64) {
tokio::time::sleep(Duration::from_secs(i)).await;
println!("{}.async task {}!", now(), i);
}
#[tokio::main]
async fn main() {
let mut joinhandles = Vec::new();
for i in 0..10 {
joinhandles.push(tokio::spawn(async_task(i)));
}
println!("{}.main thread", now());
for joinhandle in joinhandles {
joinhandle.await.unwrap();
}
}
2023-09-05 16:47:23.main thread
2023-09-05 16:47:23.async task 0!
2023-09-05 16:47:24.async task 1!
2023-09-05 16:47:25.async task 2!
2023-09-05 16:47:26.async task 3!
2023-09-05 16:47:27.async task 4!
2023-09-05 16:47:28.async task 5!
2023-09-05 16:47:29.async task 6!
2023-09-05 16:47:30.async task 7!
2023-09-05 16:47:31.async task 8!
2023-09-05 16:47:32.async task 9!
That said, awaiting them one by one is a little tedious and also prevents propagating errors immediately. Instead, you could use futures::future::try_join_all
:
use std::time::Duration;
use chrono::Local;
fn now() -> String {
Local::now().format("%F %T").to_string()
}
async fn async_task(i: u64) {
tokio::time::sleep(Duration::from_secs(i)).await;
println!("{}.async task {}!", now(), i);
}
#[tokio::main]
async fn main() {
// Same as the for-loop in the earlier example,
// just more compact and functional
let joinhandles = (0..10)
.map(|i| tokio::spawn(async_task(i)))
.collect::<Vec<_>>();
println!("{}.main thread", now());
futures::future::try_join_all(joinhandles).await.unwrap();
}
2023-09-05 16:56:14.main thread
2023-09-05 16:56:14.async task 0!
2023-09-05 16:56:15.async task 1!
2023-09-05 16:56:16.async task 2!
2023-09-05 16:56:17.async task 3!
2023-09-05 16:56:18.async task 4!
2023-09-05 16:56:19.async task 5!
2023-09-05 16:56:20.async task 6!
2023-09-05 16:56:21.async task 7!
2023-09-05 16:56:22.async task 8!
2023-09-05 16:56:23.async task 9!