Search code examples
rustrust-tokio

Is it possible to preserve items in a Tokio MPSC when the last Sender is dropped, but the Reciever is still active?


I have a bounded Tokio MPSC queue with one sender and one reciever. I put several items into it in one task (in this case, the numbers 1 to 10, inclusive) and pull those items out in another task. As expected, the receiver processes all 10 values.

Then, I add some other async task in between processing (in this case, by uncommenting the sleep call in the receive loop) and now, the contents of the channel are dropped when the last sender is finished.

use tokio::sync::mpsc;
use tokio::time::sleep;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = mpsc::channel(3);
    
    tokio::spawn(async move {
        while let Some(val) = rx.recv().await {
            println!("Recieved {}", val);
            //sleep(Duration::from_secs(1)).await;
        }
        
        println!("Recieve finished");
    });
    
    for i in 1..=10i32 {
        tx.send(i).await.unwrap();
        println!("Sent {}", i);
    }
}

Rust Playground

Here's the output with the sleep commented out (order of interleaved values sometimes changes, but "Recieved 10" is always printed):

Sent 1
Recieved 1
Recieved 2
Sent 2
Sent 3
Recieved 3
Recieved 4
Sent 4
Sent 5
Recieved 5
Recieved 6
Sent 6
Sent 7
Recieved 7
Recieved 8
Sent 8
Sent 9
Recieved 9
Recieved 10
Sent 10

And here's the output with the sleep not commented:

Sent 1
Sent 2
Sent 3
Recieved 1
Sent 4
Recieved 2
Sent 5
Recieved 3
Sent 6
Recieved 4
Sent 7
Recieved 5
Sent 8
Recieved 6
Sent 9
Recieved 7
Sent 10

Is there any way to be certain that all values put into the queue are processed by the Reciever even after the last Sender is dropped (assuming that the Reciever is not dropped)? The close function appears to do this, but going the other way (ensuring that the queue contents are processed before the Reciever is dropped). If not, is there an alternate async-friendly MPSC implementation that can provide this guarantee?


Solution

  • As mentioned in the comments, the issue is that your program exits before the receiver thread finishes. Just make sure you wait for it before exiting:

    use tokio::sync::mpsc;
    use tokio::time::sleep;
    use std::time::Duration;
    
    #[tokio::main]
    async fn main() {
        let (mut tx, mut rx) = mpsc::channel(3);
        
        let join_handle = tokio::spawn(async move {
            while let Some(val) = rx.recv().await {
                println!("Received {}", val);
                //sleep(Duration::from_secs(1)).await;
            }
            
            println!("Receive finished");
        });
        
        for i in 1..=10i32 {
            tx.send(i).await.unwrap();
            println!("Sent {}", i);
        }
        
        std::mem::drop(tx); // Drop the sender so the receiver doesn't listen forever
        join_handle.await.unwrap(); // Wait for the receiver to finish processing
    }
    
    Sent 1
    Received 1
    Received 2
    Sent 2
    Sent 3
    Sent 4
    Sent 5
    Received 3
    Received 4
    Received 5
    Sent 6
    Sent 7
    Sent 8
    Received 6
    Received 7
    Received 8
    Sent 9
    Sent 10
    Received 9
    Received 10
    Receive finished