I have an ASP .NET core Web API which uses Queued background tasks like described here.
I've used the code sample provided and added the IBackgroundTaskQueue
, BackgroundTaskQueue
and QueuedHostedService
exactly as described in the article.
In my Startup.cs
, I'm registering only one QueuedHostedService
instance as follows: services.AddHostedService<QueuedHostedService>();
Tasks coming from the WebApi's controller are enqueued and then dequeued and executed one by one by the QueuedHostedService
.
I'll would like to allow more than one background processing thread that will dequeue and execute the incoming Tasks.
The most straight forward solution i can come up with is to register more than one instance of the QueuedHostedService
in my Startup.cs
. i.e, something like this:
int maxNumOfParallelOperations;
var isValid = int.TryParse(Configuration["App:MaxNumOfParallelOperations"], out maxNumOfParallelOperations);
maxNumOfParallelOperations = isValid && maxNumOfParallelOperations > 0 ? maxNumOfParallelOperations : 2;
for (int index = 0; index < maxNumOfParallelOperations; index++)
{
services.AddHostedService<QueuedHostedService>();
}
I've also noticed that thanks to the singal Semaphore in BackgroundTaskQueue
, the QueuedHostedService
instances are not really working all the time, but only awaken when a new Task is available in the queue.
This solution seems to works just fine in my tests.
But, In this particular use case - is it really a valid, recommended solution for parallel processing?
You can use an IHostedService
with a number of threads to consume the IBackgroundTaskQueue
.
Here is a basic implementation. I assume you're using the same IBackgroundTaskQueue
and BackgroundTaskQueue
described here.
public class QueuedHostedService : IHostedService
{
private readonly ILogger _logger;
private readonly Task[] _executors;
private readonly int _executorsCount = 2; //--default value: 2
private CancellationTokenSource _tokenSource;
public IBackgroundTaskQueue TaskQueue { get; }
public QueuedHostedService(IBackgroundTaskQueue taskQueue,
ILoggerFactory loggerFactory,
IConfiguration configuration)
{
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger<QueuedHostedService>();
if (ushort.TryParse(configuration["App:MaxNumOfParallelOperations"], out var ct))
{
_executorsCount = ct;
}
_executors = new Task[_executorsCount];
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Queued Hosted Service is starting.");
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
for (var i = 0; i < _executorsCount; i++)
{
var executorTask = new Task(
async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
#if DEBUG
_logger.LogInformation("Waiting background task...");
#endif
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try
{
#if DEBUG
_logger.LogInformation("Got background task, executing...");
#endif
await workItem(cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error occurred executing {WorkItem}.", nameof(workItem)
);
}
}
}, _tokenSource.Token);
_executors[i] = executorTask;
executorTask.Start();
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Queued Hosted Service is stopping.");
_tokenSource.Cancel(); // send the cancellation signal
if (_executors != null)
{
// wait for _executors completion
Task.WaitAll(_executors, cancellationToken);
}
return Task.CompletedTask;
}
}
You need to register the services in ConfigureServices
on Startup
class.
...
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
services.AddHostedService<QueuedHostedService>();
...
Aditionally, you can set the number of threads in configuration (appsettings.json
)
...
"App": {
"MaxNumOfParallelOperations": 4
}
...