Search code examples
rustfuturerust-tokio

How to find if tokio::sync::mpsc::Receiver has been closed?


I have a loop where I do some work and send result with Sender. The work takes time and I need to retry it in case of failure. It's possible that while I retry it, the receiver has been closed and my retries are going to be a waste of time. Because of this, I need a way to check if Receiver is available without sending a message.

In an ideal world, I want my code to look like this in pseudocode:

let (tx, rx) = tokio::sync::mpsc::channel(1);

tokio::spawn(async move {
   // do som stuff with rx and drop it after some time
    rx.recv(...).await;
});

let mut attempts = 0;
loop {
    if tx.is_closed() {
       break;
    }
    if let Ok(result) = do_work().await {
        attempts = 0;
        let _ = tx.send(result).await;
    } else {
        if attempts >= 10 {
            break;
        } else {
            attempts += 1;
            continue;
        }
    }
};

The problem is that Sender doesn't have an is_closed method. It does have pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>, but I don't know what Context is or where can I find it.

When I don't have a value to send, how can I check if the sender is able to send?


Solution

  • Sender has a try_send method:

    Attempts to immediately send a message on this Sender

    This method differs from send by returning immediately if the channel's buffer is full or no receiver is waiting to acquire some data. Compared with send, this function has two failure cases instead of one (one for disconnection, one for a full buffer).

    Use it instead of send and check for the error:

    if let Err(TrySendError::Closed(_)) = tx.send(result).await {
        break;
    }
    

    It is possible to do what you want by using poll_fn from futures crate. It adapts a function returning Poll to return a Future

    use futures::future::poll_fn; // 0.3.5
    use std::future::Future;
    use tokio::sync::mpsc::{channel, error::ClosedError, Sender}; // 0.2.22
    use tokio::time::delay_for; // 0.2.22
    
    fn wait_until_ready<'a, T>(
        sender: &'a mut Sender<T>,
    ) -> impl Future<Output = Result<(), ClosedError>> + 'a {
        poll_fn(move |cx| sender.poll_ready(cx))
    }
    
    #[tokio::main]
    async fn main() {
        let (mut tx, mut rx) = channel::<i32>(1);
    
        tokio::spawn(async move {
            // Receive one value and close the channel;
            let val = rx.recv().await;
            println!("{:?}", val);
        });
    
        wait_until_ready(&mut tx).await.unwrap();
        tx.send(123).await.unwrap();
    
        wait_until_ready(&mut tx).await.unwrap();
        delay_for(std::time::Duration::from_secs(1)).await;
        tx.send(456).await.unwrap(); // 456 likely never printed out,
                                     // despite having a positive readiness response
                                     // and the send "succeeding"
    }
    

    Note, however, that in the general case this is susceptible to TOCTOU. Even though Sender's poll_ready reserves a slot in the channel for later usage, it is possible that the receiving end is closed between the readiness check and the actual send. I tried to indicate this in the code.