I am finding it difficult to understand why and when I need to explicitly do something with the Context
and/or its Waker
passed to the poll
method on an object for which I am implementing Future
. I have been reading the documentation from Tokio and the Async Book, but I feel the examples/methods are too abstract to be applied to real problems.
For example, I would have thought the following MRE would deadlock since the future generated by new_inner_task
would not know when a message has been passed on the MPSC channel, however, this example seems to work fine. Why is this the case?
use std::{future::Future, pin::Pin, task::{Context, Poll}, time::Duration};
use futures::{FutureExt, StreamExt}; // 0.3
use tokio::sync::mpsc; // 1.2
use tokio_stream::wrappers::UnboundedReceiverStream; // 0.1
async fn new_inner_task(rx: mpsc::UnboundedReceiver<()>) {
let mut requests = UnboundedReceiverStream::new(rx);
while let Some(_) = requests.next().await {
eprintln!("received request");
}
}
pub struct ActiveObject(Pin<Box<dyn Future<Output = ()> + Send>>);
impl ActiveObject {
pub fn new() -> (Self, mpsc::UnboundedSender<()>) {
let (tx, rx) = mpsc::unbounded_channel();
(Self(new_inner_task(rx).boxed()), tx)
}
}
impl Future for ActiveObject {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
eprintln!("[polled]");
self.get_mut().0.as_mut().poll(cx)
}
}
async fn delayed_send(delay: u64, sender: mpsc::UnboundedSender<()>) {
tokio::time::sleep(Duration::from_millis(delay)).await;
sender.send(()).unwrap();
eprintln!("sent request");
}
#[tokio::main]
async fn main() {
let (obj, tx) = ActiveObject::new();
let ds = delayed_send(500, tx.clone());
let ds2 = delayed_send(1000, tx);
tokio::join!(obj, ds, ds2);
}
The output that I get from running this example locally is:
[polled]
[polled]
sent request
[polled]
received request
[polled]
sent request
[polled]
received request
So, although I haven't done anything with Context
or Waker
, ActiveObject
appears to get polled at a reasonable rate, that is, more frequently than required, but not busy-waiting. What is causing ActiveObject
to be woken up/polled at this rate?
You are passing the same Context
(and thus Waker
) to the poll()
method of the Future returned by new_inner_task
, which passes it down the chain to the poll()
of the Future
returned by UnboundedReceiverStream::next()
. The implementation of that arranges to call wake()
on this Waker
at the appropriate time (when new elements appear in the channel). When that is done, Tokio polls the top-level future associated with this Waker
- the join!()
of the three futures.
If you omitted the line that polls the inner task and just returned Poll::Pending
instead, you would get the expected situation, where your Future
would be polled once and then "hang" forever, as nothing would wake it again.