Search code examples
c#asp.net-core.net-corequeuebackgroundworker

Adding multiple queues in hosted service


I am implementing queues using https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.1&tabs=visual-studio example.

this is how my code looks:

in startup.cs I am adding my hosted service and background queue

services.AddHostedService<QueuedHostedService>(); services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();

then I implement scoped service, hosted service and background queue as following:

namespace Services.Services {
  public class QueuedHostedService: BackgroundService {
    private readonly ILogger _logger;
    private readonly IServiceProvider _serviceProvider;
    public QueuedHostedService(IServiceProvider serviceProvider, IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
      _serviceProvider = serviceProvider;
      TaskQueue = taskQueue;
      _logger = loggerFactory.CreateLogger < QueuedHostedService > ();
    }
    public IBackgroundTaskQueue TaskQueue {
      get;
    }

    protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
      while (!cancellationToken.IsCancellationRequested) {
        var workItem = await TaskQueue.DequeueAsync(cancellationToken);

        try {
          await workItem(cancellationToken);
        } catch (Exception ex) {

        }
      }
    }
  }
}


public interface IBackgroundTaskQueue {
  void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
  Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
}

namespace Services.Services {
  public class BackgroundTaskQueue: IBackgroundTaskQueue {
    private ConcurrentQueue < Func < CancellationToken, Task >> _workItems = new ConcurrentQueue < Func < CancellationToken, Task >> ();
    private SemaphoreSlim _signal = new SemaphoreSlim(0);
    public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
      if (workItem == null) {
        throw new ArgumentNullException(nameof(workItem));
      }
      _workItems.Enqueue(workItem);
      _signal.Release();
    }

    public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
      await _signal.WaitAsync(cancellationToken);
      _workItems.TryDequeue(out
        var workItem);
      return workItem;
    }
  }
}


// scoped service
namespace Services.Services {
  public class ImportService: BaseService, IImportService {
    private readonly IFileProcessingService _scopedProcessingService;

    private readonly ConfigurationSettings _configurationSettings;
    public IBackgroundTaskQueue Queue {
      get;
    }
    private
    const string AZURE_BLOB_CONTAINER = "blobcontainer";

    public IServiceProvider Services {
      get;
    }

    public ImportService(IServiceProvider services, IBackgroundTaskQueue queue): base(services) {
      Services = services;
      _configurationSettings = services.GetService < ConfigurationSettings > ();
      _scopedProcessingService = services.GetProcessingService();
      Queue = queue;
    }

    // ---- Main file
    public async Task ImportFile(string filePath, long fileSize, int userId, FileFormatType fileFormat, TransactionsDataHeadersMap dataHeadersMap, string delimiter, string dateFormat) {
      await _scopedProcessingService.ImportFile(filePath, fileSize, userId, fileFormat, dataHeadersMap, delimiter, dateFormat);
    }

    public async Task UploadToBlobStorage(IFormFile file, int userId, TransactionalDataFileType type) {
      var fileFormat = GetFileFormat(file);
      var tempFilePath = await GetTemporaryPath(file);
      var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
      // ....... //

      ProcessFile(tempFilePath, fileFormat, file, type, userId);
    }

    private void ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId) {
      var delimiter = ",";
      Queue.QueueBackgroundWorkItem(async token => {
        using(var scope = Services.CreateScope()) {
          var scopedProcessingService =
            scope.ServiceProvider
            .GetRequiredService < IFileProcessingService > ();

          // do the processing
          switch (type) {
            case "csv":
              await scopedProcessingService.ImportFile(tempFilePath, file.Length, userId, fileFormat, new Headers(), delimiter ? ? ",", "yyyy-MM-dd");
              break;
          }
        }
      });
    }
  }
}

I am adding elemeents to queue on request in controller. Now I want to add another queue for pocessing other requests. Is it possible to use another queue using same Hosted service? I have trouble finding examples how to do that. Should I just add another scoped servide and another background queue?


Solution

    1. The first option is the most straightforward - you just create bunch of classes and interfaces QueuedHostedServiceA, QueuedHostedServiceB, IBackgroundTaskQueueA.. (you can use inheritance to reduce code duplication)

    2. Also you can introduce concept of "handler" and make all this stuff generic:

    interface IHandler<T> { Task Handle(T msg, CancelationToken ...)}
    interface IBackgroundMessageQueue<T> {...} // same impl but with T instead of Func<CancellationToken,Task>
    class IBackgroundMessageQueue<T> {...} // same impl but with T instead of Func<CancellationToken,Task>
    
    class QueuedHostedService<T>
    {
       public QueuedHostedService(..., IBackgroundMessageQueue<T> queue, IHandler<T> h) {... }
       protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
          while (!cancellationToken.IsCancellationRequested) {
            T message = await queue.DequeueAsync(cancellationToken);
    
            try {
                using(var scp = serviceProvider.CreateScope())
                {
                    var handler = ServiceProvider.GetRequiredService<IHandler<T>>;
                  await handler.Handle(message, cancellationToken);
                }
            } catch (Exception ex) {
    
            }
          }
        }
    }
    

    And for each message type you create your own handler:

    class ProcessFile(string tempFilePath, FileFormatType fileFormat, IFormFile file, Tyoe type, int userId){}
    
    FileProcessor: IHandler<ProcessFile> {implement your logic from ImportService.ProcessFile}
    

    Then you register everything:

    services.AddScoped<IHandler<ProcessFile>, FileProcessor>()
    services.AddSingleton<IBackgroundTaskQueue<ProcessFile>, BackgroundTaskQueue<ProcessFile>>();
    
    services.AddHostedService<QueuedHostedService<ProcessFile>>();
    

    and in your ImportService you resolve typed queue:

    public ImportService(IBackgroundMessageQueue<ProcessFile> queue) 
    

    and enqueue message in it when needed.