I have a bounded Tokio MPSC queue with one sender and one reciever. I put several items into it in one task (in this case, the numbers 1 to 10, inclusive) and pull those items out in another task. As expected, the receiver processes all 10 values.
Then, I add some other async task in between processing (in this case, by uncommenting the sleep
call in the receive loop) and now, the contents of the channel are dropped when the last sender is finished.
use tokio::sync::mpsc;
use tokio::time::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = mpsc::channel(3);
tokio::spawn(async move {
while let Some(val) = rx.recv().await {
println!("Recieved {}", val);
//sleep(Duration::from_secs(1)).await;
}
println!("Recieve finished");
});
for i in 1..=10i32 {
tx.send(i).await.unwrap();
println!("Sent {}", i);
}
}
Here's the output with the sleep
commented out (order of interleaved values sometimes changes, but "Recieved 10" is always printed):
Sent 1
Recieved 1
Recieved 2
Sent 2
Sent 3
Recieved 3
Recieved 4
Sent 4
Sent 5
Recieved 5
Recieved 6
Sent 6
Sent 7
Recieved 7
Recieved 8
Sent 8
Sent 9
Recieved 9
Recieved 10
Sent 10
And here's the output with the sleep
not commented:
Sent 1
Sent 2
Sent 3
Recieved 1
Sent 4
Recieved 2
Sent 5
Recieved 3
Sent 6
Recieved 4
Sent 7
Recieved 5
Sent 8
Recieved 6
Sent 9
Recieved 7
Sent 10
Is there any way to be certain that all values put into the queue are processed by the Reciever
even after the last Sender
is dropped (assuming that the Reciever
is not dropped)? The close
function appears to do this, but going the other way (ensuring that the queue contents are processed before the Reciever
is dropped). If not, is there an alternate async-friendly MPSC implementation that can provide this guarantee?
As mentioned in the comments, the issue is that your program exits before the receiver thread finishes. Just make sure you wait for it before exiting:
use tokio::sync::mpsc;
use tokio::time::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = mpsc::channel(3);
let join_handle = tokio::spawn(async move {
while let Some(val) = rx.recv().await {
println!("Received {}", val);
//sleep(Duration::from_secs(1)).await;
}
println!("Receive finished");
});
for i in 1..=10i32 {
tx.send(i).await.unwrap();
println!("Sent {}", i);
}
std::mem::drop(tx); // Drop the sender so the receiver doesn't listen forever
join_handle.await.unwrap(); // Wait for the receiver to finish processing
}
Sent 1
Received 1
Received 2
Sent 2
Sent 3
Sent 4
Sent 5
Received 3
Received 4
Received 5
Sent 6
Sent 7
Sent 8
Received 6
Received 7
Received 8
Sent 9
Sent 10
Received 9
Received 10
Receive finished