Search code examples
asynchronousrustrust-tokio

Tokio subtask within task not executing as expected


Although I am new to rust, after having read the rust and tokio books I thought to know how async tasks work. However, obviously I missed something crucial, since I can't seem to be able to resolve the following problem:

I have an async function (asfunc) that needs to poll some sensors indefinitely. For the purposes of demonstration and testing, I created the following stylized function emulating a steady source:

async fn asfunc(tx: Sender<String>, name: String) {
    loop {
        thread::sleep( time::Duration::from_secs(1));
        let now = time::Instant::now(); 
        tx.send(format!("[{}] {:?}",name, now)).unwrap();
    }
}

I will have to run several of these concurrently form within a main program, and some of these async function will be wrapped in another async function (wrapper()) for post-processing of the data received. Below is the core functionality (it's in a file called th_test.rs):

use std::sync::mpsc::Sender; 
use std::{thread, time};

async fn wrapper() {
    let (tx, rx) = std::sync::mpsc::channel::<String>();
    
    tokio::task::spawn(async move {
        asfunc(tx,"two".to_string()).await;
    });

    while let Ok(msg) = rx.recv() {
        println!("[wrapper] {:?}",msg);
    }
}

async fn asfunc(tx: Sender<String>, name: String) {
    loop {
        thread::sleep( time::Duration::from_secs(1));
        let now = time::Instant::now(); 
        tx.send(format!("[{}] {:?}",name, now)).unwrap();
    }
}

pub async fn run_test() {    
    let (tx, rx) = std::sync::mpsc::channel::<String>();
    
    tokio::task::spawn(async move {
        asfunc(tx,"one".to_string()).await;
    });

    tokio::task::spawn(async move {
        wrapper().await;
    });

    while let Ok(msg) = rx.recv() {
        println!("[asfunc] {:?}",msg);
    }
}

And for completeness sake, here is the main.rs file:

mod th_test;

#[tokio::main]
async fn main() {    
    th_test::run_test().await;
}

When I run this, I would expect some output from the wrapper function as well, since the code is pretty much identical to what I have in the run_test() function, but what I get is only the output from "directly" calling asfunc:

[asfunc] [one] Instant { t: 797790.6589759s }
[asfunc] [one] Instant { t: 797791.6660039s }
[asfunc] [one] Instant { t: 797792.678952s }
...

What am doing wrong here? Any help would be greatly appreciated!


Solution

  • I fully agree with everything @ChayimFriedman says.

    Here is some code to accompany his answer, based on the fact that you say that you need to use std::sync::mpsc:

    use std::sync::mpsc::Sender;
    use std::time;
    
    async fn wrapper() {
        let (tx, rx) = std::sync::mpsc::channel::<String>();
    
        tokio::task::spawn(async move {
            asfunc(tx, "two".to_string()).await;
        });
    
        tokio::task::spawn_blocking(move || {
            while let Ok(msg) = rx.recv() {
                println!("[wrapper] {:?}", msg);
            }
        })
        .await
        .unwrap();
    }
    
    async fn asfunc(tx: Sender<String>, name: String) {
        loop {
            tokio::time::sleep(time::Duration::from_secs(1)).await;
            let now = time::Instant::now();
            tx.send(format!("[{}] {:?}", name, now)).unwrap();
        }
    }
    
    pub async fn run_test() {
        let (tx, rx) = std::sync::mpsc::channel::<String>();
    
        tokio::task::spawn(async move {
            asfunc(tx, "one".to_string()).await;
        });
    
        tokio::task::spawn(async move {
            wrapper().await;
        });
    
        tokio::task::spawn_blocking(move || {
            while let Ok(msg) = rx.recv() {
                println!("[asfunc] {:?}", msg);
            }
        })
        .await
        .unwrap();
    }
    
    #[tokio::main]
    async fn main() {
        run_test().await;
    }
    
    [wrapper] "[two] Instant { tv_sec: 100182, tv_nsec: 4500700 }"
    [asfunc] "[one] Instant { tv_sec: 100182, tv_nsec: 4714700 }"
    [wrapper] "[two] Instant { tv_sec: 100183, tv_nsec: 5557800 }"
    [asfunc] "[one] Instant { tv_sec: 100183, tv_nsec: 5976800 }"
    [wrapper] "[two] Instant { tv_sec: 100184, tv_nsec: 7570800 }"
    [asfunc] "[one] Instant { tv_sec: 100184, tv_nsec: 7329700 }"
    [wrapper] "[two] Instant { tv_sec: 100185, tv_nsec: 8777300 }"
    [asfunc] "[one] Instant { tv_sec: 100185, tv_nsec: 9211100 }"
    [asfunc] "[one] Instant { tv_sec: 100186, tv_nsec: 11582400 }"
    [wrapper] "[two] Instant { tv_sec: 100186, tv_nsec: 11741700 }"
    [wrapper] "[two] Instant { tv_sec: 100187, tv_nsec: 13959800 }"
    ...