Search code examples
multithreadingasynchronousrustasync-awaitrust-tokio

An extra println in the main thread causes Rust to perform different results


I found a phenomenon when learning Rust tokio multithreading, I don't know why。

Here is the code.

use std::{thread, time::Duration};

use chrono::Local;

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

async fn async_task(i: u64) {
    thread::sleep(Duration::from_secs(i));
    println!("{}.async task {}!", now(), i);
}
#[tokio::main]
async fn main() {
    for i in 0..10 {
        tokio::spawn(async_task(i));
    }
    println!("{}.main thread", now());
}

I ran the code and found that each 10 async tasks were executed. Result was live below

2023-09-05 22:08:05.async task 0!
2023-09-05 22:08:05.main thread
2023-09-05 22:08:06.async task 1!
2023-09-05 22:08:07.async task 2!
2023-09-05 22:08:08.async task 3!
2023-09-05 22:08:09.async task 4!
2023-09-05 22:08:10.async task 5!
2023-09-05 22:08:11.async task 6!
2023-09-05 22:08:12.async task 7!
2023-09-05 22:08:13.async task 8!
2023-09-05 22:08:14.async task 9!

When I comment out the println code in the main thread, only a few of the tasks will be executed.

#[tokio::main]
async fn main() {
    for i in 0..10 {
        tokio::spawn(async_task(i));
    }
    // println!("{}.main thread", now());
}

Result was live below

2023-09-05 22:10:51.async task 0!
2023-09-05 22:10:52.async task 1!

I've tried this many times, and every time this difference occurs. On every attempt, code that does not comment out println executes all async tasks, and the other one does not.

I really don't understand why a println could make such a big difference. I'd appreciate it if anyone could help.


Solution

  • The real reason your code behaves weirdly is because you use std::thread::sleep instead of tokio::time::sleep.

    It's important when working with async functions that you never block. Async reactors are non-preemptive, meaning they can only schedule between tasks at .await points. That means, if you std::thread::sleep, you block the entire program. That's also why in your working program, the output doesn't always get printed in the correct order, although the timing should be an entire second between each print.

    If you replace std::thread::sleep with tokio::time::sleep, you will get a consistent behaviour (although it is probably not the behaviour you desire):

    use std::time::Duration;
    
    use chrono::Local;
    
    fn now() -> String {
        Local::now().format("%F %T").to_string()
    }
    
    async fn async_task(i: u64) {
        tokio::time::sleep(Duration::from_secs(i)).await;
        println!("{}.async task {}!", now(), i);
    }
    #[tokio::main]
    async fn main() {
        for i in 0..10 {
            tokio::spawn(async_task(i));
        }
        println!("{}.main thread", now());
        tokio::time::sleep(Duration::from_millis(2500)).await;
    }
    
    2023-09-05 16:39:25.main thread
    2023-09-05 16:39:25.async task 0!
    2023-09-05 16:39:26.async task 1!
    2023-09-05 16:39:27.async task 2!
    

    Why so? Because when main finishes, tokio cancels all leftover tasks. But again, tokio can only schedule at .await points, so in your 'working' example, tokio still tries to cancel the tasks, but isn't successful because they never hit an .await point. So it 'seems' like tokio is waiting for them, but that's actually not true.

    The simplest way to solve this is to not have the main function end. (actually this is the only way that comes to my mind)

    There are multiple ways to achieve that. In your case, it seems to me that your intention is to wait for all subtasks to finish, and then to end the program. This can be done by awaiting the join handles:

    use std::time::Duration;
    
    use chrono::Local;
    
    fn now() -> String {
        Local::now().format("%F %T").to_string()
    }
    
    async fn async_task(i: u64) {
        tokio::time::sleep(Duration::from_secs(i)).await;
        println!("{}.async task {}!", now(), i);
    }
    #[tokio::main]
    async fn main() {
        let mut joinhandles = Vec::new();
        for i in 0..10 {
            joinhandles.push(tokio::spawn(async_task(i)));
        }
    
        println!("{}.main thread", now());
    
        for joinhandle in joinhandles {
            joinhandle.await.unwrap();
        }
    }
    
    2023-09-05 16:47:23.main thread
    2023-09-05 16:47:23.async task 0!
    2023-09-05 16:47:24.async task 1!
    2023-09-05 16:47:25.async task 2!
    2023-09-05 16:47:26.async task 3!
    2023-09-05 16:47:27.async task 4!
    2023-09-05 16:47:28.async task 5!
    2023-09-05 16:47:29.async task 6!
    2023-09-05 16:47:30.async task 7!
    2023-09-05 16:47:31.async task 8!
    2023-09-05 16:47:32.async task 9!
    

    That said, awaiting them one by one is a little tedious and also prevents propagating errors immediately. Instead, you could use futures::future::try_join_all:

    use std::time::Duration;
    
    use chrono::Local;
    
    fn now() -> String {
        Local::now().format("%F %T").to_string()
    }
    
    async fn async_task(i: u64) {
        tokio::time::sleep(Duration::from_secs(i)).await;
        println!("{}.async task {}!", now(), i);
    }
    #[tokio::main]
    async fn main() {
        // Same as the for-loop in the earlier example,
        // just more compact and functional
        let joinhandles = (0..10)
            .map(|i| tokio::spawn(async_task(i)))
            .collect::<Vec<_>>();
    
        println!("{}.main thread", now());
    
        futures::future::try_join_all(joinhandles).await.unwrap();
    }
    
    2023-09-05 16:56:14.main thread
    2023-09-05 16:56:14.async task 0!
    2023-09-05 16:56:15.async task 1!
    2023-09-05 16:56:16.async task 2!
    2023-09-05 16:56:17.async task 3!
    2023-09-05 16:56:18.async task 4!
    2023-09-05 16:56:19.async task 5!
    2023-09-05 16:56:20.async task 6!
    2023-09-05 16:56:21.async task 7!
    2023-09-05 16:56:22.async task 8!
    2023-09-05 16:56:23.async task 9!