Search code examples
rustrust-tokio

Why the channel in the example code of tokio::sync::Notify is a mpsc?


I'm learning the synchronizing primitive of tokio. From the example code of Notify, I found it is confused to understand why Channel<T> is mpsc.

use tokio::sync::Notify;

use std::collections::VecDeque;
use std::sync::Mutex;

struct Channel<T> {
    values: Mutex<VecDeque<T>>,
    notify: Notify,
}

impl<T> Channel<T> {
    pub fn send(&self, value: T) {
        self.values.lock().unwrap()
            .push_back(value);

        // Notify the consumer a value is available
        self.notify.notify_one();
    }

    // This is a single-consumer channel, so several concurrent calls to
    // `recv` are not allowed.
    pub async fn recv(&self) -> T {
        loop {
            // Drain values
            if let Some(value) = self.values.lock().unwrap().pop_front() {
                return value;
            }

            // Wait for values to be available
            self.notify.notified().await;
        }
    }
}
  • If there are elements in values, the consumer tasks will take it away
  • If there is no element in values, the consumer tasks will yield until the producer nitify it

But after I writen some test code, I found in no case the consumer will lose the notice from producer.

Could some one give me test code to prove the above Channel<T> fail to work well as a mpmc?


The following code shows why it is unsafe to use the above channel as mpmc.

use std::sync::Arc;
#[tokio::main]
async fn main() {
    let mut i = 0;
    loop{
        let ch = Arc::new(Channel {
            values: Mutex::new(VecDeque::new()),
            notify: Notify::new(),
        });

        let mut handles = vec![];

        for i in 0..100{
            if  i % 2 == 1{
                for _ in 0..2{
                    let sender = ch.clone();
                    tokio::spawn(async move{
                        sender.send(1);
                    });
                }
            }else{
                for _ in 0..2{
                    let receiver = ch.clone();
                    let handle = tokio::spawn(async move{
                        receiver.recv().await;
                    });
                    handles.push(handle);
                }
            }
        }

        futures::future::join_all(handles).await;
        i += 1;
        println!("No.{i} loop finished.");
    }
}

Not running the next loop means that there are consumer tasks not finishing, and consumer tasks miss a notify.


Solution

  • Quote from the documentation you linked:

    If you have two calls to recv and two calls to send in parallel, the following could happen:

    1. Both calls to try_recv return None.
    2. Both new elements are added to the vector.
    3. The notify_one method is called twice, adding only a single permit to the Notify.
    4. Both calls to recv reach the Notified future. One of them consumes the permit, and the other sleeps forever.

    Replace try_recv with self.values.lock().unwrap().pop_front() in our case; the rest of the explanation stays identical.

    The third point is the important one: Multiple calls to notify_one only result in a single token if no thread is waiting yet. And there is a short time window where it is possible that multiple threads already checked for the existance of an item but aren't waiting yet.