Search code examples
c#linqasync-awaitproducer-consumeriasyncenumerable

Buffering IAsyncEnumerable in producer/consumer scenario


I Have a scenario in which I am reading some data from a database. This data is returned in the form of IAsyncEnumerable<MyData>. After reading the data I want to send it to a consumer. This consumer is asynchronous. Right now my code looks something like this:

// C#
IAsyncEnumerable<MyData> enumerable = this.dataSource.Read(query);

await foreach (var data in enumerable) 
{
    await this.consumer.Write(data);
}

My problem with this is that while I am enumerating the database, I am holding a lock on the data. I don't want to hold this lock for longer than I need to.

In the event that the consumer is consuming data slower than the producer is producing it, is there any way I can eagerly read from the datasource without just calling ToList or ToListAsync. I want to avoid reading all the data into memory at once, which would cause the opposite problem if now the producer is slower than the consumer. It is ok if the lock on the database is not as short as possible, I want a configurable tradeoff between how much data is in memory at once, and how long we keep the enumeration running.

My thought is that there would be some way to use a queue or channel-like datastructure to act as a buffer between the producer and consumer.

In Golang I would do something like this:

// go
queue := make(chan MyData, BUFFER_SIZE)
go dataSource.Read(query, queue)

// Read sends data on the channel, closes it when done

for data := range queue {
    consumer.Write(data)
}

Is there any way to get similar behavior in C#?


Solution

  • Here is a more robust implementation of the ConsumeBuffered extension method in Rafael's answer. This one uses a Channel<T> as buffer, instead of a BlockingCollection<T>. The advantage is that the enumeration of the two sequences, the source and the buffered, does not block one thread each. Care has been taken to complete the enumeration of the source sequence, in case the enumeration of the buffered sequence is abandoned prematurely by the consumer downstream.

    public static async IAsyncEnumerable<T> ConsumeBuffered<T>(
        this IAsyncEnumerable<T> source, int capacity,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(source);
        Channel<T> channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
        {
            SingleWriter = true,
            SingleReader = true,
        });
        using CancellationTokenSource completionCts = new();
    
        Task producer = Task.Run(async () =>
        {
            try
            {
                await foreach (T item in source.WithCancellation(completionCts.Token)
                    .ConfigureAwait(false))
                {
                    await channel.Writer.WriteAsync(item).ConfigureAwait(false);
                }
            }
            catch (ChannelClosedException) { } // Ignore
            finally { channel.Writer.TryComplete(); }
        });
    
        try
        {
            await foreach (T item in channel.Reader.ReadAllAsync(cancellationToken)
                .ConfigureAwait(false))
            {
                yield return item;
                cancellationToken.ThrowIfCancellationRequested();
            }
            await producer.ConfigureAwait(false); // Propagate possible source error
        }
        finally
        {
            // Prevent fire-and-forget in case the enumeration is abandoned
            if (!producer.IsCompleted)
            {
                completionCts.Cancel();
                channel.Writer.TryComplete();
                await Task.WhenAny(producer).ConfigureAwait(false);
            }
        }
    }
    

    Setting the SingleWriter and SingleReader options of the bounded channel is a bit academic, and could be omitted. Currently (.NET 6) there is only one bounded Channel<T> implementation in the System.Threading.Channels library, regardless of the supplied options. This implementation is based on a Deque<T> (internal .NET type similar with a Queue<T>) synchronized with a lock.

    The channel is enumerated inside a try/finally block, because C# iterators execute the finally blocks as part of the Dispose/DisposeAsync method of the autogenerated IEnumerator<T>/IAsyncEnumerator<T>, when the enumeration is abandoned.

    Note: In case the external CancellationToken is canceled, the cancellation is propagated as an OperationCanceledException, and all the buffered items are lost. In a producer-consumer scenario with multiple producers and consumers, this might be a problem. It is advised that the CancellationToken is used only for the purpose of destroying the entire processing pipeline, not for parts of it.