TL;DR:
What would be an efficient way to implement this pattern in C#?
Slightly more detail:
As a mental example, say we're designing an in-memory database. Observers need to be able to await changes to any subset of rows they care for. So for the sake of this question, the interface might look as simple as
interface ITable<TPrimaryKey, TRow>
{
Task WaitUntilAnyRowChanged(IReadOnlySet<TPrimaryKey> keysToListenTo, CancellationToken ct);
}
So far, the following has come to my mind:
Idea 1: TaskCompletionSource on rows
Associate a TaskCompletionSource
with each row. When a value changes, the system would complete the existing TaskCompletionSource
associated with that row, and replace it with a new one for the next pending update. Consumers (i.e. WaitUntilAnyRowChanged
) could then just Task.WhenAny(...)
on all relevant ones.
Downsides:
Task.WhenAny(...)
on thousands of TaskCompletionSource
-s to be very inefficientIdea 2: AsyncAutoResetEvent (or similar synchronization structures) on observers
We switch roles. Each row holds a List<AsyncAutoResetEvent>
. Each observer will create one AsyncAutoResetEvent
of their own, and register it with all rows they're interested in. On write, a row will set the signal for all of its listeners.
Downsides:
Idea 3: Observers listening to all changes
Observers could listen to a stream pushing the keys for any and all changes. Observers would be responsible for filtering it down to the subset of keys they're interested in, and potentially setting a local TaskCompletionSource
or such.
Pro:
Cons:
Idea 4: Polling
Each row maintains a Box<long>
, whose boxed value contains a version stamp of the row's current version.
Observers periodically poll all rows they're interested in, store the last known version, and see if any version updated since the last poll.
Pro:
Downsides:
Overall, I'm leaning towards idea 4 as the most feasible approach so far. But I have this maybe irrational hatred of polling..
So I'd be curious to hear thoughts - can anyone think of a better solution? It feels like this isn't a particularly niche problem. Maybe there's some standard approach to this type of issue that I'm not aware of?
This is an interesting problem. My first thought is to maintain a static dictionary with subscriptions:
private static readonly Dictionary<TPrimaryKey, HashSet<Observer>> s_subscriptions;
The Observer
derives from the TaskCompletionSource
:
class Observer : TaskCompletionSource
{
private readonly TPrimaryKey[] _keys;
public void Subscribe();
public void Complete();
}
An observer is subscribed by adding all the keys it observes in the s_subscriptions
dictionary:
public void Subscribe()
{
foreach (TPrimaryKey key in _keys)
{
s_subscriptions.GetOrAdd(key, () => new HashSet<Observer>()).Add(this);
}
}
When a value changes, you go to the dictionary and Complete
all the observers that are observing this key:
if (_subscriptions.TryGetValue(key, out List<Observer> observers))
{
foreach (Observer observer in observers)
{
observer.Complete();
}
}
When an observer is completed, it removes itself from the s_subscriptions
:
public void Complete()
{
foreach (TPrimaryKey key in _keys)
{
s_subscriptions[key].Remove(this);
}
base.SetResult(); // Completes the base.Task
}
You may have to instantiate each Observer
with the TaskCreationOptions.RunContinuationsAsynchronously
option, so that it completes on the ThreadPool
instead of completing on the same thread that modifies the value.
For minimizing to pressure on the garbage collector you could consider using ValueTask
s instead of TaskCompletionSource
s, backed by reusable IValueTaskSource
implementations. But that might be too much work for too little benefit.