Search code examples
c#asynchronouseventssynchronization

Async waiting for changes to a subset of rows in a large in-memory table


TL;DR:

  • I have an in-memory database that maintains a large (10-50 million) number of rows
  • I need an efficient way for an observer to start listening to any subset of these (typically around 10,000 items), and asynchronously wait until any of them has changed
  • The system needs to be prepared to have at least 50,000 of such observers active at the same time
  • Assume around 1,000 - 10,000 rows per second will be written/updated

What would be an efficient way to implement this pattern in C#?


Slightly more detail:

As a mental example, say we're designing an in-memory database. Observers need to be able to await changes to any subset of rows they care for. So for the sake of this question, the interface might look as simple as

interface ITable<TPrimaryKey, TRow>
{
  Task WaitUntilAnyRowChanged(IReadOnlySet<TPrimaryKey> keysToListenTo, CancellationToken ct);
}

So far, the following has come to my mind:

Idea 1: TaskCompletionSource on rows

Associate a TaskCompletionSource with each row. When a value changes, the system would complete the existing TaskCompletionSourceassociated with that row, and replace it with a new one for the next pending update. Consumers (i.e. WaitUntilAnyRowChanged) could then just Task.WhenAny(...) on all relevant ones.

Downsides:

  1. I would expect chaining Task.WhenAny(...) on thousands of TaskCompletionSource-s to be very inefficient
  2. Not the biggest issue, but requires a relatively large amount of allocations

Idea 2: AsyncAutoResetEvent (or similar synchronization structures) on observers

We switch roles. Each row holds a List<AsyncAutoResetEvent>. Each observer will create one AsyncAutoResetEvent of their own, and register it with all rows they're interested in. On write, a row will set the signal for all of its listeners.

Downsides:

  1. This has the potentially to significantly slow down the write speed of the database; what could otherwise be a simple dictionary write now becomes a write + looping through potentially thousands of observers, triggering their AsyncAutoResentEvent

Idea 3: Observers listening to all changes

Observers could listen to a stream pushing the keys for any and all changes. Observers would be responsible for filtering it down to the subset of keys they're interested in, and potentially setting a local TaskCompletionSource or such.

Pro:

  1. Simple to implement
  2. Relatively predictable performance (could dedicate x number of threads to forwarding these events to observers)

Cons:

  1. Quite inefficient if an observer only cares for a small subset of rows. Imagine only listening to 1 row in a table of 50 million rows.

Idea 4: Polling

Each row maintains a Box<long>, whose boxed value contains a version stamp of the row's current version. Observers periodically poll all rows they're interested in, store the last known version, and see if any version updated since the last poll.

Pro:

  1. Simple
  2. Feels like it could scale well: Write speed isn't affected. And for an observer to loop through ~10k boxed longs to read & compare their values should be reasonably fast.
  3. Pretty much GC free

Downsides:

  1. Nobody likes polling! Change detection is delayed by up to the poll interval, rather than always picked up immediately
  2. May not scale that well with an increasing number of observed keys

Overall, I'm leaning towards idea 4 as the most feasible approach so far. But I have this maybe irrational hatred of polling..

So I'd be curious to hear thoughts - can anyone think of a better solution? It feels like this isn't a particularly niche problem. Maybe there's some standard approach to this type of issue that I'm not aware of?


Solution

  • This is an interesting problem. My first thought is to maintain a static dictionary with subscriptions:

    private static readonly Dictionary<TPrimaryKey, HashSet<Observer>> s_subscriptions;
    

    The Observer derives from the TaskCompletionSource:

    class Observer : TaskCompletionSource
    {
        private readonly TPrimaryKey[] _keys;
    
        public void Subscribe();
        public void Complete();
    }
    

    An observer is subscribed by adding all the keys it observes in the s_subscriptions dictionary:

        public void Subscribe()
        {
            foreach (TPrimaryKey key in _keys)
            {
                s_subscriptions.GetOrAdd(key, () => new HashSet<Observer>()).Add(this);
            }
        }
    

    When a value changes, you go to the dictionary and Complete all the observers that are observing this key:

    if (_subscriptions.TryGetValue(key, out List<Observer> observers))
    {
        foreach (Observer observer in observers)
        {
            observer.Complete();
        }
    }
    

    When an observer is completed, it removes itself from the s_subscriptions:

        public void Complete()
        {
            foreach (TPrimaryKey key in _keys)
            {
                s_subscriptions[key].Remove(this);
            }
            base.SetResult(); // Completes the base.Task
        }
    

    You may have to instantiate each Observer with the TaskCreationOptions.​RunContinuationsAsynchronously option, so that it completes on the ThreadPool instead of completing on the same thread that modifies the value.

    For minimizing to pressure on the garbage collector you could consider using ValueTasks instead of TaskCompletionSources, backed by reusable IValueTaskSource implementations. But that might be too much work for too little benefit.