Search code examples
multithreadingasynchronousrustasync-await

Recursive async function: future cannot be sent between threads safely


I want to make a tool to iterate over a specific directory. It will read the video duration and resolution using ffmpeg when finding a video file, and that's a time-consuming work. Therefore I want to use async functions to improve the performance. However, it cannot pass the compilation check. Here is the minimal reproducible example:

use std::sync::Arc;

use tokio::sync::Mutex;

#[derive(Debug)]
enum Entry {
    Dir(Vec<Entry>),
    File,
}

async fn iterate_dir() -> Entry {
    let entries = Arc::new(Mutex::new(Vec::<Entry>::new()));
    let mut tasks = tokio::task::JoinSet::<()>::new();
    for _entry in 0..10 {
        let entries_cloned = entries.clone();
        if rand::random::<f32>() > 0.8 {
            // It's directory
            tasks.spawn(async move {
                entries_cloned.lock().await.push(iterate_dir().await);
            });
        } else {
            // It's file
            entries_cloned.lock().await.push(Entry::File);
        }
    }
    tasks.join_all().await;
    Entry::Dir(Arc::try_unwrap(entries).unwrap().into_inner())
}

And a more concise example after trimming all statements irrelevant to the error message:

async fn test() {
    tokio::spawn(async move {
        test().await;
    });
}

The diagnostic message are both like:

error: future cannot be sent between threads safely
   --> src/main.rs:2:5
    |
2   | /     tokio::spawn(async move {
3   | |         test().await;
4   | |     });
    | |______^ future created by async block is not `Send`
    |
    = note: cannot satisfy `impl Future<Output = ()>: Send`
note: future is not `Send` as it awaits another future which is not `Send`
   --> src/main.rs:3:9
    |
3   |         test().await;
    |         ^^^^^^ await occurs here on type `impl Future<Output = ()>`, which is not `Send`
note: required by a bound in `tokio::spawn`
   --> C:\Users\pc\.cargo\registry\src\index.crates.io-6f17d22bba15001f\tokio-1.40.0\src\task\spawn.rs:167:21
    |
165 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
166 |     where
167 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`

I've googled this question before posting this question, but the answers I got are mostly about adding + Send to your Result<(), Box<dyn Error>>. However there's no such statement in my code. So what should I do to solve this problem?


Solution

  • This is probably because of this bug. The workaround is to replace the async functions with functions returning impl Future + Send and put an async block directly inside:

    playground

    use std::future::Future;
    use std::sync::Arc;
    
    use tokio::sync::Mutex;
    
    #[derive(Debug)]
    enum Entry {
        Dir(Vec<Entry>),
        File,
    }
    
    fn iterate_dir() -> impl Future<Output=Entry> + Send {async {
        let entries = Arc::new(Mutex::new(Vec::<Entry>::new()));
        let mut tasks = tokio::task::JoinSet::<()>::new();
        for _entry in 0..10 {
            let entries_cloned = entries.clone();
            // let entries.cloned
            if rand::random::<f32>() > 0.8 {
                // It's directory
                tasks.spawn(async move {
                    entries_cloned.lock().await.push(iterate_dir().await);
                });
            } else {
                // It's file
                entries_cloned.lock().await.push(Entry::File);
            }
        }
        tasks.join_all().await;
        Entry::Dir(Arc::try_unwrap(entries).unwrap().into_inner())
    }}
    

    playground

    use std::future::Future;
    
    fn test() -> impl Future<Output = ()> + Send {async {
        tokio::spawn(async move {
            test().await;
        });
    }}