Search code examples
rustrust-tokio

async-stream + anyhow is not Send


I have a function that returns Stream<Item=Result<..>>.

use futures::StreamExt;
use parking_lot::Mutex;
use std::sync::Arc;

fn get_stream() -> impl futures::Stream<Item = anyhow::Result<u8>> {
    async_stream::try_stream! {
        let a = Arc::new(Mutex::new(()));
        for item in &[1,2,3] {
            {
                let mut _guard = a.try_lock().expect("aa");
                Err(anyhow::anyhow!("asdf"))?;
            }
            {
                yield *item;
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let _s = get_stream().boxed();
}

The mutex guard _guard should be immediately dropped, but rustc complains get_stream is not Send due to _guard.

error: future cannot be sent between threads safely
  --> src/main.rs:22:27
   |
22 |     let _s = get_stream().boxed();
   |                           ^^^^^ future created by async block is not `Send`
   |
   = help: within `impl futures::Stream`, the trait `std::marker::Send` is not implemented for `*mut ()`
note: future is not `Send` as this value is used across an await
  --> src/main.rs:6:5
   |
6  | /     async_stream::try_stream! {
7  | |         let a = Arc::new(Mutex::new(()));
8  | |         for item in &[1,2,3] {
9  | |             {
...  |
16 | |         }
17 | |     }
   | |_____^ first, await occurs here, with `mut _guard` maybe used later...
note: `mut _guard` is later dropped here
  --> src/main.rs:6:5
   |
6  | /     async_stream::try_stream! {
7  | |         let a = Arc::new(Mutex::new(()));
8  | |         for item in &[1,2,3] {
9  | |             {
10 | |                 let mut _guard = a.try_lock().expect("aa");
   | |                     ---------- has type `parking_lot::lock_api::MutexGuard<'_, parking_lot::RawMutex, ()>` which is not `Send`
...  |
16 | |         }
17 | |     }
   | |_____^
   = note: this error originates in the macro `async_stream::try_stream` (in Nightly builds, run with -Z macro-backtrace for more info)

Changing the order of statements to

Err(anyhow::anyhow!("asdf"))?;
let mut _guard = a.try_lock().expect("aa");            

does not make compile error. Why is this? anyhow might generate a stacktrace, but does it capture local variables in anyhow::Error?


Solution

  • The issue here is that the code expands to a match statement including an .await point where the Err is sent over the channel used to implement the stream:

    let lock = a.try_lock();
    match Err({
        let error = ::anyhow::private::format_err(
            match match () {
                () => [],
            } {
                ref args => unsafe { ::core::fmt::Arguments::new_v1(&["asdf"], args) },
            },
        );
        error
    }) {
        ::core::result::Result::Ok(v) => v,
        ::core::result::Result::Err(e) => {
            __yield_tx.send(::core::result::Result::Err(e.into())).await;
            return;
        }
    };
    // ..rest of the function
    

    The lock is treated as alive for the entire match which includes the await point, thus you get an !Send future.

    A more minimal example of this issue would be:

    fn foo() -> impl Future<Output = ()> + Send {
        async {
            let e: Result<(), ()> = Err(());
            let a = Arc::new(Mutex::new(()));
            let mut lock = a.lock();
            if e.is_err() {
                futures::future::ready(e).await;
                return;
            }
        }
    }
    

    The most straight-forward way to fix this is what you already do before yielding the item at the end of the function: encapsulate all locking in a scope of its own without emitting any stream items within the scope.

    This could be done through scoping within the function - becomes annoying when you want to emit errors using the ? operator - or through sync functions or sync closures which lock the mutex internally and drop it before returning. This way you can get fairly ergonomic error handling through ? on the results:

    fn get_stream() -> impl futures::Stream<Item = anyhow::Result<u8>> {
        async_stream::try_stream! {
            let a = Arc::new(Mutex::new(()));
            for item in &[1,2,3] {
                let scoped = || {
                    let lock = a.try_lock().expect("aa");
                    Err(anyhow::anyhow!("asdf"))?;
                    Ok::<(), anyhow::Error>(*lock)
                };
                scoped()?;
                yield *item;
            }
        }
    }