I'm learning the synchronizing primitive of tokio. From the example code of Notify
, I found it is confused to understand why Channel<T>
is mpsc.
use tokio::sync::Notify;
use std::collections::VecDeque;
use std::sync::Mutex;
struct Channel<T> {
values: Mutex<VecDeque<T>>,
notify: Notify,
}
impl<T> Channel<T> {
pub fn send(&self, value: T) {
self.values.lock().unwrap()
.push_back(value);
// Notify the consumer a value is available
self.notify.notify_one();
}
// This is a single-consumer channel, so several concurrent calls to
// `recv` are not allowed.
pub async fn recv(&self) -> T {
loop {
// Drain values
if let Some(value) = self.values.lock().unwrap().pop_front() {
return value;
}
// Wait for values to be available
self.notify.notified().await;
}
}
}
But after I writen some test code, I found in no case the consumer will lose the notice from producer.
Could some one give me test code to prove the above Channel<T>
fail to work well as a mpmc?
The following code shows why it is unsafe to use the above channel
as mpmc.
use std::sync::Arc;
#[tokio::main]
async fn main() {
let mut i = 0;
loop{
let ch = Arc::new(Channel {
values: Mutex::new(VecDeque::new()),
notify: Notify::new(),
});
let mut handles = vec![];
for i in 0..100{
if i % 2 == 1{
for _ in 0..2{
let sender = ch.clone();
tokio::spawn(async move{
sender.send(1);
});
}
}else{
for _ in 0..2{
let receiver = ch.clone();
let handle = tokio::spawn(async move{
receiver.recv().await;
});
handles.push(handle);
}
}
}
futures::future::join_all(handles).await;
i += 1;
println!("No.{i} loop finished.");
}
}
Not running the next loop means that there are consumer tasks not finishing, and consumer tasks miss a notify.
Quote from the documentation you linked:
If you have two calls to
recv
and two calls tosend
in parallel, the following could happen:
- Both calls to
try_recv
returnNone
.- Both new elements are added to the vector.
- The
notify_one
method is called twice, adding only a single permit to the Notify.- Both calls to
recv
reach the Notified future. One of them consumes the permit, and the other sleeps forever.
Replace try_recv
with self.values.lock().unwrap().pop_front()
in our case; the rest of the explanation stays identical.
The third point is the important one: Multiple calls to notify_one
only result in a single token if no thread is waiting yet. And there is a short time window where it is possible that multiple threads already checked for the existance of an item but aren't waiting yet.