I am making my own channel implementation, but std::task::Context
doesn't make it clear how the waker was generated.
My fake code:
struct MyAtomicWaker {
lock: SpinLock,
is_waked: AtomicBool,
waker: std::task::Waker,
}
struct WeakAtomicWaker (Weak<MyAtomicWaker>)
impl MyAtomicWaker {
fn is_waked(&self) -> bool {}
fn weak(self: Arc<MyAtomicWaker>) -> WeakAtomicWaker;
fn cancel(&self) {} // nullify WeakAtomicWaker, means the waker is not waked by a future
}
impl WeakAtomicWaker {
fn wake(self) {} // upgrade to arc and can wake only once when waker not cancelled
}
struct ReceiveFuture<T> {
waker: Option<Arc<MyAtomicWaker>>,
}
impl<T> Drop for ReceiveFuture<T> {
fn drop(&mut self) {
if let Some(waker) = self.waker.take() { waker.cancel(); }
}
}
impl<T> Future for ReceiveFuture<T> {
type Output = Result<(), SendError<T>>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let _self = self.get_mut();
if _self.waker.is_none() {
let my_waker = _self.reg_waker(ctx.waker().clone()); // wrap the waker with Arc, store it inside _self, and send the weak ref to other side of channel
_self.waker.replace(my_waker);
}
// do some polling
match _self.recv.try_recv() {
Ok(item)=>{
if let Some(waker) = _self.waker.take() {
waker.cancel();
}
return Poll::Ready(item); //canncel my waker and ready
},
Err(TryRecvError)=>{
if let Some(waker) = _self.waker.as_ref() {
if waker.is_wake() { // the waker is triggered but it's a false alarm, should make a new one.
let my_waker = self.reg_waker(ctx.waker().clone());
_self.waker.replace(my_waker);
} else { // the waker has not trigger, we do not have to make a new one ?
}
}
return Poll::Pending;
},
Err(...)
}
}
}
Is it necessary to register a new waker every time poll()
is called? In my code, there's a lot of timeouts and looping selects due to the combination of different futures.
I have a little experiment that works on the playground, but I'm not sure whether it will always work fine for both Tokio and async-std in various settings.
In my production code, I register a new waker and cancel the old waker in every poll()
call. I don't know whether it is safe to only register a waker the first time and reuse it on the next polls.
Given the following order:
f.reg_waker(waker1)
f.poll()
gets Poll::Pending
f.poll()
gets Poll::Pending
will waker1.wake()
guarantee to wake up f
after that?
I'm asking this because:
I have a Stream
that multiplexes multiple receiving channels
My MPMC and MPSC channel implementations are lockless. Some channels inside a multiplex selection may be used as a close notification channel and seldom gets message. When I'm polling it a lot (say a million times), it will lead to a million waker thrown to the other side (which looks like a memory leak). Canceling previous wakers produced in the same future without lock is more complex logic than an implementation with lock
For these reasons, I have a waker canceling solution that leads to fairness problem, which needs to be avoided as much as possible
I'm not interested in what the book states, or what the API laws declare; I'm only interested in how the low level is implemented. Show some code why this works or why this does not work would be helpful. I code to implement product; if necessary I will stick to a specified dependency or do some hacking in order to get the job done until I have a better way.
Given the fact that "a waker can be waked in parallel of Future::poll
.
Counter-evidence: Presuming every time a waker must be clone()
and re-registered in order for this future to wake up properly, this will make previous waker invalid, so it's not possible for "concurrent wake up from different thread, e.g. future::select
block". The conclusion is not true, so it counter-proves this statement:
"A waker is always valid from the time starting from ctx.waker().clone()
until waker.wake()
". This serves my motive. waker is not needed to reset every time, if it's not used to woken yet.
In addition to investigate tokio waker implementation, all RawWaker produce by ctx.waker.clone() is just a ref_count to a manual drop memory entry on the heap, if wakers clone outside prevent the ref_count dropping zero, the real waker entry always exists.