Search code examples
c#asynchronousconcurrencyproducer-consumeriasyncenumerable

High-performance conflating producer/consumer pattern


Say you have the following interface:

interface IConflateWorkByKey<TKey, TValue>
{
  IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues();
  void Publish(TKey key, TValue value);
}

This is intended to describe a work-queue like pattern. Producers can queue up items via Publish, and a single consumer would drain items via GetValues, whereby:

  • Values should be conflated by key. Meaning, if producers call Publish(1, "hello") followed by Publish(1, "world") before the consumer had a chance to drain the next batch (dictionary), then the next dictionary the consumer receives would should key: 1, value: "world". The prior update (1, "hello") would be dropped/never be seen by the consumer
  • Publish will typically be called much faster than work can be drained. In other words, invocations of Publish should be as fast as possible, whereby draining items is not quite as performance critical.
  • It is very likely that in most practical cases, when the iterator return from GetValues is moved on by the work consumer, new items would already be available and no actual waiting is required; it would be useful to have a fast-path optimization for this case. However, an implementation would need to be prepared for this not to be the case, and then asynchronously wait for new items to be available
  • There will only be one consumer (i.e.: GetValues will only ever be called/consumed by 1 consumer)
  • Publish will not be called concurrently (though it may be called sequentially from different threads)

My current implementation looks as follows:

class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue>
{
  private Dictionary<TKey,TValue>? _buffered = null;
  private readonly object _lock = new();

  public IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues(CancellationToken ct)
  {
    while(!ct.IsCancellationRequested)
    {
      lock(_lock)
      {
        while(_buffered is null)
          Monitor.Wait(_lock);

        var result = _buffered;
        _buffered = null;
        yield return result;
      }
    }
  }

  public void Publish(TKey key, TValue value)
  {
    lock(_lock)
    {
      _buffered ??= new();
      _buffered[key] = value;
      Monitor.Pulse(_lock);
    }
  }
}

Note that I'm open to changing the Publish method to return a ValueTask if it would be optimal for a particular implementation.

This works alright in principle, but the major problem is that the implementation of GetValues here is not asynchronous; the calling thread is properly blocked on Monitor.Wait.

I have attempted this pattern with AsyncMonitor from Nito.AsyncEx as well - but unfortunately, AsyncMonitor.Pulse is significantly too slow.

Can anyone think of a more clever implementation/pattern which is blazing fast in terms of Publishing values, while at the same allows for truly async waiting/signaling from within GetValues?


Edit: Here is another idea. I have yet to think through whether this is correct, and measure performance, but listing it here for debate. Of course still curious about other ideas!

class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue>
{
  private Dictionary<TKey,TValue>? _buffered = new();
  private readonly object _lock = new();
  private TaskCompletionSource? _tcs = null;

  public IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues(CancellationToken ct)
  {
    while(!ct.IsCancellationRequested)
    {
      Dictionary<TKey,TValue> result;

      while(true) {
        lock(_lock)
        {
          if(_buffered.Any())
          {
            // "Fast path" - next result is already available, publish directly without having to wait
            result = _buffered;
            _buffered = new();
            break;
          }
          _tcs = new();
        }
        await _tcs.Task;
      }

      yield return result;
    }
  }

  public void Publish(TKey key, TValue value)
  {
    lock(_lock)
    {
      _buffered[key] = value;

      if(_tcs is not null)
      {
        _tcs.TrySetResult();
        _tcs = null; // "Fast path", next invocation of publish doesn't even need to call TrySetResult() if values weren't drained in between
      }
    }
  }
}

Solution

  • Here is an implementation. It uses a TaskCompletionSource<T> for the case that the consumer attempts to consume a batch at a moment that the dictionary is empty:

    class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue>
    {
        private readonly object _locker = new();
        private TaskCompletionSource<Dictionary<TKey, TValue>> _tcs;
        private Dictionary<TKey, TValue> _dictionary;
    
        public void Publish(TKey key, TValue value)
        {
            lock (_locker)
            {
                if (_tcs is not null)
                {
                    Debug.Assert(_dictionary is null);
                    _tcs.SetResult(new() { { key, value } });
                    _tcs = null;
                }
                else
                {
                    _dictionary ??= new();
                    _dictionary[key] = value;
                }
            }
        }
    
        public async IAsyncEnumerable<Dictionary<TKey, TValue>> GetValues(
            [EnumeratorCancellation] CancellationToken cancellationToken = default)
        {
            using CancellationTokenRegistration ctr = cancellationToken.Register(() =>
            {
                lock (_locker)
                {
                    if (_tcs is not null)
                    {
                        _tcs.SetCanceled(cancellationToken);
                        _tcs = null;
                    }
                }
            });
    
            while (true)
            {
                Dictionary<TKey, TValue> result = null;
                Task<Dictionary<TKey, TValue>> taskResult = null;
                lock (_locker)
                {
                    if (_tcs is not null) throw new InvalidOperationException(
                        "Multiple consumers are not supported.");
                    cancellationToken.ThrowIfCancellationRequested();
                    if (_dictionary is not null)
                    {
                        result = _dictionary;
                        _dictionary = null;
                    }
                    else
                    {
                        _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
                        taskResult = _tcs.Task;
                    }
                }
                if (result is not null)
                    yield return result;
                else if (taskResult is not null)
                    yield return await taskResult.ConfigureAwait(false);
            }
        }
    }
    

    This implementation supports multiple concurrent producers, and at most one consumer.

    Ideally you would need an extra method like CompletePublishing, so that the producer can signal than no more items are going to be produced, allowing the consumer to exit the await foreach loop after processing all the batches. The CancellationToken is not ideal for this role, because it cancels the consumer abruptly, potentially before consuming the final batch. At least that's the expected behavior for a CancellationToken. Compare for example the CompleteAdding and the Complete methods of similar .NET components.