Search code examples
asynchronousrustrust-tokio

How to use Context and Wakers when implementing Future in practice


I am finding it difficult to understand why and when I need to explicitly do something with the Context and/or its Waker passed to the poll method on an object for which I am implementing Future. I have been reading the documentation from Tokio and the Async Book, but I feel the examples/methods are too abstract to be applied to real problems.

For example, I would have thought the following MRE would deadlock since the future generated by new_inner_task would not know when a message has been passed on the MPSC channel, however, this example seems to work fine. Why is this the case?

use std::{future::Future, pin::Pin, task::{Context, Poll}, time::Duration};
use futures::{FutureExt, StreamExt}; // 0.3
use tokio::sync::mpsc; // 1.2
use tokio_stream::wrappers::UnboundedReceiverStream; // 0.1

async fn new_inner_task(rx: mpsc::UnboundedReceiver<()>) {
    let mut requests = UnboundedReceiverStream::new(rx);
    while let Some(_) = requests.next().await {
        eprintln!("received request");
    }
}

pub struct ActiveObject(Pin<Box<dyn Future<Output = ()> + Send>>);

impl ActiveObject {
    pub fn new() -> (Self, mpsc::UnboundedSender<()>) {
        let (tx, rx) = mpsc::unbounded_channel();
        (Self(new_inner_task(rx).boxed()), tx)
    }
}

impl Future for ActiveObject {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        eprintln!("[polled]");
        self.get_mut().0.as_mut().poll(cx)
    }
}

async fn delayed_send(delay: u64, sender: mpsc::UnboundedSender<()>) {
    tokio::time::sleep(Duration::from_millis(delay)).await;
    sender.send(()).unwrap();
    eprintln!("sent request");
}

#[tokio::main]
async fn main() {
    let (obj, tx) = ActiveObject::new();
    let ds = delayed_send(500, tx.clone());
    let ds2 = delayed_send(1000, tx);
    
    tokio::join!(obj, ds, ds2);
}

The output that I get from running this example locally is:

[polled]
[polled]
sent request
[polled]
received request
[polled]
sent request
[polled]
received request

So, although I haven't done anything with Context or Waker, ActiveObject appears to get polled at a reasonable rate, that is, more frequently than required, but not busy-waiting. What is causing ActiveObject to be woken up/polled at this rate?


Solution

  • You are passing the same Context (and thus Waker) to the poll() method of the Future returned by new_inner_task, which passes it down the chain to the poll() of the Future returned by UnboundedReceiverStream::next(). The implementation of that arranges to call wake() on this Waker at the appropriate time (when new elements appear in the channel). When that is done, Tokio polls the top-level future associated with this Waker - the join!() of the three futures.

    If you omitted the line that polls the inner task and just returned Poll::Pending instead, you would get the expected situation, where your Future would be polled once and then "hang" forever, as nothing would wake it again.