Search code examples
rustrust-tokio

Polling future with another task waker


I am trying to implement a sink wrapper that delegates flushing to another task. Basically:

  • wrapper spawns a service task when constructed
  • service task in a loop runs flush() on underlying stream and pauses if poll_flush() returns Ready
  • when client sends data into the wrapped sink, it feed()s data to underlying sink and then calls inner.poll_flush() with a context holding waker for service task

Idea should be pretty clear -- clients do not need to bother with flushing the sink and yet it get flushed automatically as soon as underlying socket/whatever becomes ready.

(to avoid dealing with multi-threading issues assume all this happens within the same LocalSet)

I can't figure out how to built aforementioned Waker for service task -- is it even possible in Tokio? If not -- is it possible to ask runtime to wake up another task using it's handle?


Solution

  • I figured it out:

    #[derive(Default)]
    struct LocalNotify {
        is_on: bool,
        waker: Option<Waker>,
    }
    
    impl LocalNotify {
        fn delegate_poll<T, F: FnOnce(&mut Context<'_>) -> Poll<T>>(
            &mut self,
            f: F,
        ) -> Option<T> {
            let w = self.waker.as_ref().unwrap_or_else(|| noop_waker_ref());
            match f(&mut Context::from_waker(w)) {
                Poll::Pending => {
                    self.is_on = true;
                    None
                },
                Poll::Ready(r) => Some(r),
            }
        }
    
    
    
        fn notify(&mut self) {
            if !self.is_on {
                self.is_on = true;
                self.waker.take().map(|w| w.wake());
            }
        }
    
        fn poll_notified(&mut self, cx: &mut Context<'_>) -> Poll<()> {
            if self.is_on {
                self.is_on = false;
                self.waker = None;
                return Poll::Ready(());
            } else {
                if self.waker.as_ref().map_or(true, |w| !w.will_wake(cx.waker())) {
                    self.waker = Some(cx.waker().clone());
                }
                return Poll::Pending;
            }
        }
    
        async fn notified(&mut self) {
            poll_fn(|cx| self.poll_notified(cx)).await
        }
    }
    

    So, if you want to delegate poll_flush wakeup to another task (that is waiting on LocalNotify) you can:

    fn poll_send(&mut self, cx: &mut Context<'_>, item: &String) -> Poll<Result<()>> {
        ...
        ready!(<MyStream as SinkExt<String>>::poll_ready_unpin(inner, cx))?;
    
        let initiate_flush = inner.write_buffer().is_empty();
    
        inner.start_send_unpin(item.clone())?;
    
        if initiate_flush {
            ready!(self.svc_notify.delegate_poll(|cx| <MyStream as SinkExt<String>>::poll_flush_unpin(inner, cx)))?;
        // ... or
        // self.svc_notify.notify();
        ...
    

    (naturally that task should be running inner.flush() in select! wrapped in a loop. You'll have to use poll_fn() and RefCell to be able to borrow &mut momentarily during related poll calls)