Searched hard for a piece of code which does what i want and i am happy with. Reading this and this helped a lot.
I have a scenario where i need a single consumer to be notified by a single producer when new data is available but would also like the consumer to be notified periodically regardless of if new data is available. It is fine if the consumer is notified more than the reoccurring period but it should not be notified less frequent.
It is possible that multiple notifications for 'new data' occur while the consumer is already notified and working. (So SemaphoreSlim
was not a good fit).
Hence, a consumer which is slower than the rate of producer notifications, would not queue up subsequent notifications, they would just "re-signal" that same "data available" flag without affect.
I would also like the consumer to asynchronously wait for the notifications (without blocking a thread).
I have stitched together the below class which wraps around TaskCompletionSource
and also uses an internal Timer.
public class PeriodicalNotifier : IDisposable
{
// Need some dummy type since TaskCompletionSource has only the generic version
internal struct VoidTypeStruct { }
// Always reuse this allocation
private static VoidTypeStruct dummyStruct;
private TaskCompletionSource<VoidTypeStruct> internalCompletionSource;
private Timer reSendTimer;
public PeriodicalNotifier(int autoNotifyIntervalMs)
{
internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
reSendTimer = new Timer(_ => Notify(), null, 0, autoNotifyIntervalMs);
}
public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
{
using (cancellationToken.Register(() => internalCompletionSource.TrySetCanceled()))
{
await internalCompletionSource.Task;
// Recreate - to be able to set again upon the next wait
internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
}
}
public void Notify()
{
internalCompletionSource.TrySetResult(dummyStruct);
}
public void Dispose()
{
reSendTimer.Dispose();
internalCompletionSource.TrySetCanceled();
}
}
Users of this class can do something like this:
private PeriodicalNotifier notifier = new PeriodicalNotifier(100);
// ... In some task - which should be non-blocking
while (some condition)
{
await notifier.WaitForNotifictionAsync(_tokenSource.Token);
// Do some work...
}
// ... In some thread, producer added new data
notifier.Notify();
Efficiency is important to me, the scenario is of a high frequency data stream, and so i had in mind:
TaskCompletionSource
My questions are:
Update:
I have reached a conclusion that aside from re implementing a more lean Task Completion structure (like in here and here) i have no more optimizations to make. Hope that helps anyone looking at a similar scenario.
TaskCompletionSource
recreation should be outside the using scope, otherwise the "old" cancellation token may cancel the "new" TaskCompletionSource
.AsyncManualResetEvent
combined with a Timer
would be simpler and less error-prone. There's a very nice namespace with async tools in the Visual Studio SDK by Microsoft. You need to install the SDK and then reference the Microsoft.VisualStudio.Threading
assembly. Here's an implementation using their AsyncManualResetEvent
with the same API:public class PeriodicalNotifier : IDisposable
{
private readonly Timer _timer;
private readonly AsyncManualResetEvent _asyncManualResetEvent;
public PeriodicalNotifier(TimeSpan autoNotifyInterval)
{
_asyncManualResetEvent = new AsyncManualResetEvent();
_timer = new Timer(_ => Notify(), null, TimeSpan.Zero, autoNotifyInterval);
}
public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
{
await _asyncManualResetEvent.WaitAsync().WithCancellation(cancellationToken);
_asyncManualResetEvent.Reset();
}
public void Notify()
{
_asyncManualResetEvent.Set();
}
public void Dispose()
{
_timer.Dispose();
}
}
You notify by setting the reset event, asynchronously wait using WaitAsync
, enable Cancellation using the WithCancellation
extension method and then reset the event. Multiple notifications are "merged" by setting the same reset event.