I am trying to implement a sink wrapper that delegates flushing to another task. Basically:
flush()
on underlying stream and pauses if poll_flush()
returns Ready
feed()
s data to underlying sink and then calls inner.poll_flush()
with a context holding waker for service taskIdea should be pretty clear -- clients do not need to bother with flushing the sink and yet it get flushed automatically as soon as underlying socket/whatever becomes ready.
(to avoid dealing with multi-threading issues assume all this happens within the same LocalSet
)
I can't figure out how to built aforementioned Waker
for service task -- is it even possible in Tokio? If not -- is it possible to ask runtime to wake up another task using it's handle?
I figured it out:
#[derive(Default)]
struct LocalNotify {
is_on: bool,
waker: Option<Waker>,
}
impl LocalNotify {
fn delegate_poll<T, F: FnOnce(&mut Context<'_>) -> Poll<T>>(
&mut self,
f: F,
) -> Option<T> {
let w = self.waker.as_ref().unwrap_or_else(|| noop_waker_ref());
match f(&mut Context::from_waker(w)) {
Poll::Pending => {
self.is_on = true;
None
},
Poll::Ready(r) => Some(r),
}
}
fn notify(&mut self) {
if !self.is_on {
self.is_on = true;
self.waker.take().map(|w| w.wake());
}
}
fn poll_notified(&mut self, cx: &mut Context<'_>) -> Poll<()> {
if self.is_on {
self.is_on = false;
self.waker = None;
return Poll::Ready(());
} else {
if self.waker.as_ref().map_or(true, |w| !w.will_wake(cx.waker())) {
self.waker = Some(cx.waker().clone());
}
return Poll::Pending;
}
}
async fn notified(&mut self) {
poll_fn(|cx| self.poll_notified(cx)).await
}
}
So, if you want to delegate poll_flush
wakeup to another task (that is waiting on LocalNotify
) you can:
fn poll_send(&mut self, cx: &mut Context<'_>, item: &String) -> Poll<Result<()>> {
...
ready!(<MyStream as SinkExt<String>>::poll_ready_unpin(inner, cx))?;
let initiate_flush = inner.write_buffer().is_empty();
inner.start_send_unpin(item.clone())?;
if initiate_flush {
ready!(self.svc_notify.delegate_poll(|cx| <MyStream as SinkExt<String>>::poll_flush_unpin(inner, cx)))?;
// ... or
// self.svc_notify.notify();
...
(naturally that task should be running inner.flush()
in select!
wrapped in a loop. You'll have to use poll_fn()
and RefCell
to be able to borrow &mut
momentarily during related poll
calls)