Search code examples
c#asp.net-corebatchingsystem.threading.channels

How to read remaining items in Channel<T> less than batch size, if there is no new items coming to channel within X minutes?


I am using Channel from System.Threading.Channels and wants to read items in batch (5 items) and I have a method like below,

public class Batcher
{
    private readonly Channel<MeasurementViewModel> _channel;
    public Batcher()
    {
        _channel = Channel.CreateUnbounded<MeasurementViewModel>();
    }
    public async Task<MeasurementViewModel[]> ReadBatchAsync(int batchSize, CancellationToken stoppingToken)
    {
        var result = new MeasurementViewModel[batchSize];

        for (var i = 0; i < batchSize; i++)
        {
            result[i] = await _channel.Reader.ReadAsync(stoppingToken);
        }

        return result;
    }
}

and in ASP.NET Core background service I am using it like below,

public class WriterService : BackgroundService
{
    private readonly Batcher _batcher;
    public WriterService(Batcher batcher)
    {
        _batcher = batcher;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var batchOfItems = await _batcher.ReadBatchAsync(5, stoppingToken);

            var range = string.Join(',', batchOfItems.Select(item => item.Value));

            var x = range;
        }
    }
}

and this is working and whenever there is 5 items in Channel, I am getting range.

Question is, when there are only 2 items left in Channel and since last 10 minutes NO items coming to Channel, then how to read the remaining 2 items in Channel?


Solution

  • You could create a linked CancellationTokenSource, so that you can watch simultaneously for both an external cancellation request, and an internally induced timeout. Below is an example of using this technique, by creating a ReadBatchAsync extension method for the ChannelReader class:

    public static async ValueTask<T[]> ReadBatchAsync<T>(
        this ChannelReader<T> channelReader,
        int batchSize, TimeSpan timeout, CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(channelReader);
        if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
        if (timeout < TimeSpan.Zero)
            throw new ArgumentOutOfRangeException(nameof(timeout));
        using CancellationTokenSource linkedCTS = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);
        linkedCTS.CancelAfter(timeout);
        List<T> buffer = new();
        while (true)
        {
            var token = buffer.Count == 0 ? cancellationToken : linkedCTS.Token;
            T item;
            try
            {
                item = await channelReader.ReadAsync(token).ConfigureAwait(false);
            }
            catch (OperationCanceledException)
            {
                cancellationToken.ThrowIfCancellationRequested();
                break; // The cancellation was induced by timeout (ignore it)
            }
            catch (ChannelClosedException)
            {
                if (buffer.Count == 0) throw;
                break;
            }
            buffer.Add(item);
            if (buffer.Count >= batchSize) break;
        }
        return buffer.ToArray();
    }
    

    This method will produce a batch immediately after the specified timeout has elapsed, or sooner if the batchSize has been reached, provided that the batch contains at least one item. Otherwise it will produce a single-item batch as soon as the first item is received.

    In case the channel has been completed by calling the channel.Writer.Complete() method, and it contains no more items, the ReadBatchAsync method propagates the same ChannelClosedException that is thrown by the native ReadAsync method.

    In case the external CancellationToken is canceled, the cancellation is propagated by throwing an OperationCanceledException. Any items that may have already been extracted internally from the ChannelReader<T> at this time, are lost. This makes the cancellation feature a destructive operation. It is advisable that the whole Channel<T> should be discarded after that.

    Usage example:

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (true)
        {
            MeasurementViewModel[] batch;
            try
            {
                batch = await _channel.Reader.ReadBatchAsync(
                    5, TimeSpan.FromMinutes(10), stoppingToken);
            }
            catch (OperationCanceledException) { return; }
            catch (ChannelClosedException) { break; }
    
            Console.WriteLine(String.Join(',', batch.Select(item => item.Value)));
        }
        await _channel.Reader.Completion; // Propagate possible failure
    }
    

    For an alternative approach of consuming a channel in batches, whose cancellation is non-destructive, you can look at this question: