Search code examples
c#concurrencyproducer-consumer

Efficiently reading two different System.Threading.Channels from the same thread


Assume the following situation:

  • I have two different event streams
  • Stream 1 is a pull-based stream of very fast moving events (think financial instrument prices)
  • Stream 2 is much much slower moving (think of actions to do based on those prices)
  • Both streams must be processed on one joint worker thread/task
  • The implementation must be as efficient as possible
  • Stream 2 should be processed with priority / ahead of stream 1

One way of doing this would be pushing everything into a System.Threading.Channel<AnyOf<Event1, Event2>> - this fulfills all requirements above.. except for the last. To the best of my knowledge, there is no way to prioritize event 2 over item 1.

A possibly better way would be to push these events into separate channels, and read from both of these channels.

What is the best way of doing so, however?

Roughly speaking, I could do this:

class Example {
  private readonly Channel<Event1> _channel1 = Channel.CreateBounded<Event1>(....);
  private readonly Channel<Event2> _channel2 = Channel.CreateBounded<Event2>(....);

  public async Task Worker(CancellationToken ct) {
    while(!ct.IsCancellationRequested) {
      var valueTask1 = _channel1.Reader.WaitToReadAsync(ct);
      var valueTask2 = _channel2.Reader.WaitToReadAsync(ct);
      
      // Waaait a moment.. this is wishful thinking! WhenAny for ValueTasks does not exist, at least not out-of-the-box
      if(await ValueTaskExtensions.WhenAny(valueTask1, valueTask2) == 0)
      {
        // valueTask1 triggered
        var element = await _channel1.Reader.ReadAsync(ct);
        // Process
      } else {
        // valueTask2 triggered
        var element = await _channel2.Reader.ReadAsync(ct);
        // Process
      }
    }
  }

}

With the caveat there is no built-in ValueTask.WhenAny(). Turning these into proper tasks would seem very wasteful due to allocations for each incoming price tick.

There are however community contributions for a WhenAny implementation, though I have not used those those in practice yet & can't judge their quality and possible caveats.

Is there any more clever way to read from two channels from the same reader thread, without having to leave the wonderful world of ValueTasks ?


Solution

  • I came up with this consuming method, that consumes two channels prioritizing the first channel. As long as the first channel has an element available for consumption, the ConsumeBothPrioritized will consume this element, and no element from the second channel will be consumed.

    public readonly record struct AnyOf<T1, T2>(T1 Item1, T2 Item2, int Index);
    
    /// <summary>
    /// Consumes two channels, prioritizing the first channel over the second.
    /// </summary>
    public static async IAsyncEnumerable<AnyOf<T1, T2>> ConsumeBothPrioritized<T1, T2>(
        ChannelReader<T1> channelReader1, ChannelReader<T2> channelReader2,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(channelReader1);
        ArgumentNullException.ThrowIfNull(channelReader2);
        cancellationToken.ThrowIfCancellationRequested();
        Task<bool> task1 = null;
        Task<bool> task2 = null;
        while (true)
        {
            // Consume repeatedly the first channel until it's empty.
            while (channelReader1.TryRead(out T1 item1))
            {
                yield return new(item1, default, 1);
                cancellationToken.ThrowIfCancellationRequested();
                task1 = null; // Discard the old wait task. We will need a new one.
            }
    
            // Consume just one item from the second channel.
            if (channelReader2.TryRead(out T2 item2))
            {
                yield return new(default, item2, 2);
                cancellationToken.ThrowIfCancellationRequested();
                task2 = null; // Discard the old wait task. We will need a new one.
                continue; // Continue consuming the first channel again.
            }
    
            // Both channels are currently empty.
            // Wait until one of the channels becomes non-empty.
            task1 ??= channelReader1.WaitToReadAsync(cancellationToken).AsTask();
            task2 ??= channelReader2.WaitToReadAsync(cancellationToken).AsTask();
            Task completedTask;
            if (channelReader1.Completion.IsCompleted)
            {
                // Ignore the first channel. It has already completed.
                completedTask = await Task.WhenAny(task2).ConfigureAwait(false);
            }
            else if (channelReader2.Completion.IsCompleted)
            {
                // Ignore the second channel. It has already completed.
                completedTask = await Task.WhenAny(task1).ConfigureAwait(false);
            }
            else
            {
                // Take into account both channels.
                completedTask = await Task.WhenAny(task1, task2).ConfigureAwait(false);
            }
    
            if (channelReader1.Completion.IsCompleted
                && channelReader2.Completion.IsCompleted)
            {
                // Both channels have completed.
                // Propagate possible exceptions before exiting.
                await Task.WhenAll(channelReader1.Completion,
                    channelReader2.Completion).ConfigureAwait(false);
                yield break;
            }
    
            // Discard the task that we know it has completed.
            if (completedTask == task1)
                task1 = null;
            else if (completedTask == task2)
                task2 = null;
        }
    }
    

    This method minimizes the amount of WaitToReadAsync tasks that it creates. In case one of the channels is empty during many iterations of the while loop, the same WaitToReadAsync task instance is awaited in all these loops. For this reason the ValueTask<bool> is converted to Task<bool> (with the AsTask method), so that it can be awaited multiple times safely. This makes it also possible to use the standard TPL methods Task.WhenAny and Task.WhenAll, that are not natively available for value-tasks.

    In case one channel completes with an exception, the other channel will continue being consumed. The error of the faulted channel will be surfaced when the other channel completes as well (provided of course that the caller of the ConsumeBothPrioritized will not abandon the enumeration prematurely).

    In case both channels complete with an exception, only the exception of the first channel is propagated through the resulting IAsyncEnumerable<AnyOf<T1, T2>>.

    Usage example:

    await foreach (var item in ConsumeBothPrioritized(channel1.Reader, channel2.Reader))
    {
        if (item.Index == 1)
        {
            // Do something with item.Item1
        }
        else if (item.Index == 2)
        {
            // Do something with item.Item2
        }
    }