Search code examples
c#asp.netasp.net-coreparallel-processingasp.net-core-hosted-services

ASP .Net Core Queued background tasks parallel processing


I have an ASP .NET core Web API which uses Queued background tasks like described here.

I've used the code sample provided and added the IBackgroundTaskQueue, BackgroundTaskQueue and QueuedHostedService exactly as described in the article.

In my Startup.cs, I'm registering only one QueuedHostedService instance as follows: services.AddHostedService<QueuedHostedService>();

Tasks coming from the WebApi's controller are enqueued and then dequeued and executed one by one by the QueuedHostedService.

I'll would like to allow more than one background processing thread that will dequeue and execute the incoming Tasks. The most straight forward solution i can come up with is to register more than one instance of the QueuedHostedService in my Startup.cs. i.e, something like this:

 int maxNumOfParallelOperations;
 var isValid = int.TryParse(Configuration["App:MaxNumOfParallelOperations"], out maxNumOfParallelOperations);

 maxNumOfParallelOperations = isValid && maxNumOfParallelOperations > 0 ? maxNumOfParallelOperations : 2;

 for (int index = 0; index < maxNumOfParallelOperations; index++) 
 {
    services.AddHostedService<QueuedHostedService>();
 }

I've also noticed that thanks to the singal Semaphore in BackgroundTaskQueue, the QueuedHostedService instances are not really working all the time, but only awaken when a new Task is available in the queue.

This solution seems to works just fine in my tests.

But, In this particular use case - is it really a valid, recommended solution for parallel processing?


Solution

  • You can use an IHostedService with a number of threads to consume the IBackgroundTaskQueue.

    Here is a basic implementation. I assume you're using the same IBackgroundTaskQueue and BackgroundTaskQueue described here.

    public class QueuedHostedService : IHostedService
    {
        private readonly ILogger _logger;
    
        private readonly Task[] _executors;
        private readonly int _executorsCount = 2; //--default value: 2
        private CancellationTokenSource _tokenSource;
        public IBackgroundTaskQueue TaskQueue { get; }
    
        public QueuedHostedService(IBackgroundTaskQueue taskQueue,
            ILoggerFactory loggerFactory,
            IConfiguration configuration)
        {
            TaskQueue = taskQueue;
            _logger = loggerFactory.CreateLogger<QueuedHostedService>();
    
            if (ushort.TryParse(configuration["App:MaxNumOfParallelOperations"], out var ct))
            {
                _executorsCount = ct;
            }
            _executors = new Task[_executorsCount];
        }
    
        public Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Queued Hosted Service is starting.");
    
            _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
    
            for (var i = 0; i < _executorsCount; i++)
            {
                var executorTask = new Task(
                    async () =>
                    {
                        while (!cancellationToken.IsCancellationRequested)
                        {
    #if DEBUG
                        _logger.LogInformation("Waiting background task...");
    #endif
                        var workItem = await TaskQueue.DequeueAsync(cancellationToken);
    
                            try
                            {
    #if DEBUG
                            _logger.LogInformation("Got background task, executing...");
    #endif
                            await workItem(cancellationToken);
                            }
                            catch (Exception ex)
                            {
                                _logger.LogError(ex,
                                    "Error occurred executing {WorkItem}.", nameof(workItem)
                                );
                            }
                        }
                    }, _tokenSource.Token);
    
                _executors[i] = executorTask;
                executorTask.Start();
            }
    
            return Task.CompletedTask;
        }
    
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Queued Hosted Service is stopping.");
            _tokenSource.Cancel(); // send the cancellation signal
    
            if (_executors != null)
            {
                // wait for _executors completion
                Task.WaitAll(_executors, cancellationToken);
            }
    
            return Task.CompletedTask;
        }
    }
    

    You need to register the services in ConfigureServices on Startup class.

    ...
    services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
    services.AddHostedService<QueuedHostedService>();
    ...
    

    Aditionally, you can set the number of threads in configuration (appsettings.json)

    ...
    "App": {
        "MaxNumOfParallelOperations": 4
    }
    ...