Search code examples
c#asynchronousstreamproducer-consumerasp.net-core-hosted-services

C# process IAsyncEnumerable items with several parallel tasks in asp.net core hosted service consumer


I'm implementing producer-consumer in asp.net core app using hosted service. I was able to get it working to a point where consumer is processing items from _recordProcessingChannel.ReadAllAsync() synchronously.

I'm trying to split result of _recordProcessingChannel.ReadAllAsync() into several parallel tasks. For ex.: I have 10000 items read from channel and i want to split this work into 4 seperate tasks and process 2500 items per ICMService.

consumer:

await foreach (var record in _recordProcessingChannel.ReadAllAsync())
{

    using var scope = _serviceProvider.CreateScope();
    var processor = scope.ServiceProvider.GetRequiredService<ICMService>();

    processor.UploadRecord(record);
                
}

reader:

public IAsyncEnumerable<RecordData> ReadAllAsync(CancellationToken ct = default) => _channel.Reader.ReadAllAsync(ct);

Thank you in advance for any help provided


Solution

  • You can start needed number of processing tasks and use BlockingCollection to enqueue work. Something like this:

    // my dummy async enumerable
    public async IAsyncEnumerable<int> ReadAllAsync()
    {
        for (int i = 0; i < 3; i++)
        {
            yield return i*3 + 1;
            yield return i*3 + 2;
            yield return i*3 + 3;
            await Task.Delay(200);
        }
        yield return 777;
    }
    var collection = new BlockingCollection<int>();
    // start "processors"
    var tasks = Enumerable.Range(0, 4)
        .Select(i => 
            Task.Run(() =>
            {
                while (!collection.IsCompleted)
                {           
                    int? data = null;
                    try
                    {
                        data = collection.Take();
                    }
                    catch (InvalidOperationException) { }
    
                    if (data != null)
                    {
                        // simulate processing 
                        Thread.Sleep(400);
                        Console.WriteLine(data.Value);
                    }
                }
                Console.WriteLine("No more items to take.");
            }))
        .ToArray();
    
    await foreach (var record in ReadAllAsync())
    {
        collection.Add(record);
    }
    collection.CompleteAdding(); // signal that enqueuing has finished
    
    await Task.WhenAll(tasks);
    

    This can be improved introducing some async signaling (with SemaphoreSlim.WaitAsync or AsyncManualResetEvent.WaitAsync for example) so the consumer threads will not consume CPU while waiting for new items. For example:

    var collection = new ConcurrentQueue<int>();
    var semaphore = new SemaphoreSlim(0, 4);
    var cts = new CancellationTokenSource(); // to signal that queueing is completed
    var tasks = Enumerable.Range(0, 4)
        .Select(i => 
            Task.Run(async () =>
            {
                while (true)
                {
                    if (cts.Token.IsCancellationRequested && !collection.Any())
                    {
                        Console.WriteLine("No more items to take.");
                        break;
                    }
                    else if (!cts.Token.IsCancellationRequested)
                    {
                        try
                        {
                            await semaphore.WaitAsync(cts.Token);
                        }
                        catch (OperationCanceledException)
                        {
                            //ignore
                        }
                    }
    
                    if(collection.TryDequeue(out var data))
                    {
                        //simulate work
                        Thread.Sleep(400);
                        Console.WriteLine(data);
                    }                   
                }
            }))
        .ToArray();
    
    await foreach (var record in ReadAllAsync())
    {
        collection.Enqueue(record);
        semaphore.Release();
    }
    cts.Cancel(); // addition completed.
    await Task.WhenAll(tasks);
    Console.WriteLine("end");