I have a function that returns Stream<Item=Result<..>>
.
use futures::StreamExt;
use parking_lot::Mutex;
use std::sync::Arc;
fn get_stream() -> impl futures::Stream<Item = anyhow::Result<u8>> {
async_stream::try_stream! {
let a = Arc::new(Mutex::new(()));
for item in &[1,2,3] {
{
let mut _guard = a.try_lock().expect("aa");
Err(anyhow::anyhow!("asdf"))?;
}
{
yield *item;
}
}
}
}
#[tokio::main]
async fn main() {
let _s = get_stream().boxed();
}
The mutex guard _guard
should be immediately dropped, but rustc
complains get_stream
is not Send due to _guard
.
error: future cannot be sent between threads safely
--> src/main.rs:22:27
|
22 | let _s = get_stream().boxed();
| ^^^^^ future created by async block is not `Send`
|
= help: within `impl futures::Stream`, the trait `std::marker::Send` is not implemented for `*mut ()`
note: future is not `Send` as this value is used across an await
--> src/main.rs:6:5
|
6 | / async_stream::try_stream! {
7 | | let a = Arc::new(Mutex::new(()));
8 | | for item in &[1,2,3] {
9 | | {
... |
16 | | }
17 | | }
| |_____^ first, await occurs here, with `mut _guard` maybe used later...
note: `mut _guard` is later dropped here
--> src/main.rs:6:5
|
6 | / async_stream::try_stream! {
7 | | let a = Arc::new(Mutex::new(()));
8 | | for item in &[1,2,3] {
9 | | {
10 | | let mut _guard = a.try_lock().expect("aa");
| | ---------- has type `parking_lot::lock_api::MutexGuard<'_, parking_lot::RawMutex, ()>` which is not `Send`
... |
16 | | }
17 | | }
| |_____^
= note: this error originates in the macro `async_stream::try_stream` (in Nightly builds, run with -Z macro-backtrace for more info)
Changing the order of statements to
Err(anyhow::anyhow!("asdf"))?;
let mut _guard = a.try_lock().expect("aa");
does not make compile error. Why is this? anyhow
might generate a stacktrace, but does it capture local variables in anyhow::Error
?
The issue here is that the code expands to a match statement including an .await
point where the Err
is sent over the channel used to implement the stream:
let lock = a.try_lock();
match Err({
let error = ::anyhow::private::format_err(
match match () {
() => [],
} {
ref args => unsafe { ::core::fmt::Arguments::new_v1(&["asdf"], args) },
},
);
error
}) {
::core::result::Result::Ok(v) => v,
::core::result::Result::Err(e) => {
__yield_tx.send(::core::result::Result::Err(e.into())).await;
return;
}
};
// ..rest of the function
The lock is treated as alive for the entire match
which includes the await
point, thus you get an !Send
future.
A more minimal example of this issue would be:
fn foo() -> impl Future<Output = ()> + Send {
async {
let e: Result<(), ()> = Err(());
let a = Arc::new(Mutex::new(()));
let mut lock = a.lock();
if e.is_err() {
futures::future::ready(e).await;
return;
}
}
}
The most straight-forward way to fix this is what you already do before yield
ing the item at the end of the function: encapsulate all locking in a scope of its own without emitting any stream items within the scope.
This could be done through scoping within the function - becomes annoying when you want to emit errors using the ?
operator - or through sync functions or sync closures which lock the mutex internally and drop it before returning. This way you can get fairly ergonomic error handling through ?
on the results:
fn get_stream() -> impl futures::Stream<Item = anyhow::Result<u8>> {
async_stream::try_stream! {
let a = Arc::new(Mutex::new(()));
for item in &[1,2,3] {
let scoped = || {
let lock = a.try_lock().expect("aa");
Err(anyhow::anyhow!("asdf"))?;
Ok::<(), anyhow::Error>(*lock)
};
scoped()?;
yield *item;
}
}
}