Search code examples
rustasync-awaitborrow-checkerrust-tokio

How do I encapsulate waiting for a `tokio::sync::watch::Receiver` to be `Some`?


I'd like to write an async function recv_some which takes a watch::Receiver<Option<T>>, waits for the value to be Some, and returns something which Deref's to T. For reasons I sort of understand, when I do this the obvious way, the borrow checker complains. I can't seem to work around the issue either.

use std::ops::Deref;

use tokio::sync::watch;

pub struct WatchSomeRef<'a, T>(watch::Ref<'a, Option<T>>);

impl<T> Deref for WatchSomeRef<'_, T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        self.0.as_ref().unwrap()
    }
}

pub async fn recv_some<T>(
    rx: &mut watch::Receiver<Option<T>>,
) -> Result<WatchSomeRef<T>, watch::error::RecvError> {
    loop {
        {
            let guard = rx.borrow_and_update();
            if guard.is_some() {
                return Ok(WatchSomeRef(guard));
            }
        }
        rx.changed().await?;
    }
}

When I do this, I get the error:

error[E0499]: cannot borrow `*rx` as mutable more than once at a time
  --> src/lib.rs:20:25
   |
16 |     rx: &mut watch::Receiver<Option<T>>,
   |         - let's call the lifetime of this reference `'1`
...
20 |             let guard = rx.borrow_and_update();
   |                         ^^ `*rx` was mutably borrowed here in the previous iteration of the loop
21 |             if guard.is_some() {
22 |                 return Ok(WatchSomeRef(guard));
   |                        ----------------------- returning this value requires that `*rx` is borrowed for `'1`

error[E0499]: cannot borrow `*rx` as mutable more than once at a time
  --> src/lib.rs:25:9
   |
16 |     rx: &mut watch::Receiver<Option<T>>,
   |         - let's call the lifetime of this reference `'1`
...
20 |             let guard = rx.borrow_and_update();
   |                         -- first mutable borrow occurs here
21 |             if guard.is_some() {
22 |                 return Ok(WatchSomeRef(guard));
   |                        ----------------------- returning this value requires that `*rx` is borrowed for `'1`
...
25 |         rx.changed().await?;
   |         ^^ second mutable borrow occurs here

ETA:

I think this happens because which lifetime we want on the guard depends on the underlying value. If it's Some, it needs to outlast the function call, but if it's None it needs to be dropped right away so we can wait for the next change.

I still have no idea how to work around it though (except using an unsafe mem::transmute to change the lifetime).


Solution

  • It seems to be a false positive by the borrow checker, probably closely related to its known limitations discussed here.

    If you use watch::Receiver::wait_for, it compiles fine:

    use std::ops::Deref;
    
    use tokio::sync::watch;
    
    pub struct WatchSomeRef<'a, T>(watch::Ref<'a, Option<T>>);
    
    impl<T> Deref for WatchSomeRef<'_, T> {
        type Target = T;
    
        fn deref(&self) -> &Self::Target {
            self.0.as_ref().unwrap()
        }
    }
    
    pub async fn recv_some<T>(
        rx: &mut watch::Receiver<Option<T>>,
    ) -> Result<WatchSomeRef<T>, watch::error::RecvError> {
        let value = rx.wait_for(|val| val.is_some()).await?;
        Ok(WatchSomeRef(value))
    }
    

    Or its more compact version:

    pub async fn recv_some<T>(
        rx: &mut watch::Receiver<Option<T>>,
    ) -> Result<WatchSomeRef<T>, watch::error::RecvError> {
        rx.wait_for(Option::is_some).await.map(WatchSomeRef)
    }