I am implementing queues using https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.1&tabs=visual-studio example.
this is how my code looks:
in startup.cs I am adding my hosted service and background queue
services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
then I implement scoped service, hosted service and background queue as following:
namespace Services.Services {
public class QueuedHostedService: BackgroundService {
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
public QueuedHostedService(IServiceProvider serviceProvider, IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
_serviceProvider = serviceProvider;
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger < QueuedHostedService > ();
}
public IBackgroundTaskQueue TaskQueue {
get;
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try {
await workItem(cancellationToken);
} catch (Exception ex) {
}
}
}
}
}
public interface IBackgroundTaskQueue {
void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
}
namespace Services.Services {
public class BackgroundTaskQueue: IBackgroundTaskQueue {
private ConcurrentQueue < Func < CancellationToken, Task >> _workItems = new ConcurrentQueue < Func < CancellationToken, Task >> ();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
if (workItem == null) {
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out
var workItem);
return workItem;
}
}
}
// scoped service
namespace Services.Services {
public class ImportService: BaseService, IImportService {
private readonly IFileProcessingService _scopedProcessingService;
private readonly ConfigurationSettings _configurationSettings;
public IBackgroundTaskQueue Queue {
get;
}
private
const string AZURE_BLOB_CONTAINER = "blobcontainer";
public IServiceProvider Services {
get;
}
public ImportService(IServiceProvider services, IBackgroundTaskQueue queue): base(services) {
Services = services;
_configurationSettings = services.GetService < ConfigurationSettings > ();
_scopedProcessingService = services.GetProcessingService();
Queue = queue;
}
// ---- Main file
public async Task ImportFile(string filePath, long fileSize, int userId, FileFormatType fileFormat, TransactionsDataHeadersMap dataHeadersMap, string delimiter, string dateFormat) {
await _scopedProcessingService.ImportFile(filePath, fileSize, userId, fileFormat, dataHeadersMap, delimiter, dateFormat);
}
public async Task UploadToBlobStorage(IFormFile file, int userId, TransactionalDataFileType type) {
var fileFormat = GetFileFormat(file);
var tempFilePath = await GetTemporaryPath(file);
var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
// ....... //
ProcessFile(tempFilePath, fileFormat, file, type, userId);
}
private void ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId) {
var delimiter = ",";
Queue.QueueBackgroundWorkItem(async token => {
using(var scope = Services.CreateScope()) {
var scopedProcessingService =
scope.ServiceProvider
.GetRequiredService < IFileProcessingService > ();
// do the processing
switch (type) {
case "csv":
await scopedProcessingService.ImportFile(tempFilePath, file.Length, userId, fileFormat, new Headers(), delimiter ? ? ",", "yyyy-MM-dd");
break;
}
}
});
}
}
}
I am adding elemeents to queue on request in controller. Now I want to add another queue for pocessing other requests. Is it possible to use another queue using same Hosted service? I have trouble finding examples how to do that. Should I just add another scoped servide and another background queue?
The first option is the most straightforward - you just create bunch of classes and interfaces QueuedHostedServiceA
, QueuedHostedServiceB
, IBackgroundTaskQueueA
.. (you can use inheritance to reduce code duplication)
Also you can introduce concept of "handler" and make all this stuff generic:
interface IHandler<T> { Task Handle(T msg, CancelationToken ...)}
interface IBackgroundMessageQueue<T> {...} // same impl but with T instead of Func<CancellationToken,Task>
class IBackgroundMessageQueue<T> {...} // same impl but with T instead of Func<CancellationToken,Task>
class QueuedHostedService<T>
{
public QueuedHostedService(..., IBackgroundMessageQueue<T> queue, IHandler<T> h) {... }
protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
T message = await queue.DequeueAsync(cancellationToken);
try {
using(var scp = serviceProvider.CreateScope())
{
var handler = ServiceProvider.GetRequiredService<IHandler<T>>;
await handler.Handle(message, cancellationToken);
}
} catch (Exception ex) {
}
}
}
}
And for each message type you create your own handler:
class ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId){}
FileProcessor: IHandler<ProcessFile> {implement your logic from ImportService.ProcessFile}
Then you register everything:
services.AddScoped<IHandler<ProcessFile>, FileProcessor>()
services.AddSingleton<IBackgroundTaskQueue<ProcessFile>, BackgroundTaskQueue<ProcessFile>>();
services.AddHostedService<QueuedHostedService<ProcessFile>>();
and in your ImportService
you resolve typed queue:
public ImportService(IBackgroundMessageQueue<ProcessFile> queue)
and enqueue message in it when needed.