Search code examples
asp.net-corequeueasp.net-web-api2

.NET core web api with queue processing


How to setup a .NET core web api that

  • accepts a string value,
  • puts into a queue
  • and return flag that message is accepted (regardless it is processed).

Also, a routine which keeps checking the queue, and process the messages one by one.

As per the requirement, the api is going to act as the receiver of messages which may get hits as much as hundreds of times in a minute, while the messages it receives should be processed one by one. I am bit new to web apis, so wonder if such setup is good to have and if yes how to put together different components.

Thanks in advance..


Solution

  • Honestly, I don't think that it makes sense to receive and process messages in one process, so I would recommend to use external messaging system like RabbitMQ or Kafka or any other existing system of your preference, where you can put your messages and another process would consume it. It's quite big topic, you can start from this tutorial

    If you still want to have it in one process it's also possible, you can create a background task queue, put there your messages and create background task which will consume them from that queue.

    public interface IBackgroundTaskQueue
    {
        void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
    
        Task<Func<CancellationToken, Task>> DequeueAsync(
            CancellationToken cancellationToken);
    }
    
    public class BackgroundTaskQueue : IBackgroundTaskQueue
    {
        private ConcurrentQueue<Func<CancellationToken, Task>> _workItems = 
            new ConcurrentQueue<Func<CancellationToken, Task>>();
        private SemaphoreSlim _signal = new SemaphoreSlim(0);
    
        public void QueueBackgroundWorkItem(
            Func<CancellationToken, Task> workItem)
        {
            if (workItem == null)
            {
                throw new ArgumentNullException(nameof(workItem));
            }
    
            _workItems.Enqueue(workItem);
            _signal.Release();
        }
    
        public async Task<Func<CancellationToken, Task>> DequeueAsync(
            CancellationToken cancellationToken)
        {
            await _signal.WaitAsync(cancellationToken);
            _workItems.TryDequeue(out var workItem);
    
            return workItem;
        }
    }
    

    Background task:

    public class QueuedHostedService : BackgroundService
    {
        private readonly ILogger _logger;
    
        public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
            ILoggerFactory loggerFactory)
        {
            TaskQueue = taskQueue;
            _logger = loggerFactory.CreateLogger<QueuedHostedService>();
        }
    
        public IBackgroundTaskQueue TaskQueue { get; }
    
        protected async override Task ExecuteAsync(
            CancellationToken cancellationToken)
        {
            _logger.LogInformation("Queued Hosted Service is starting.");
    
            while (!cancellationToken.IsCancellationRequested)
            {
                var workItem = await TaskQueue.DequeueAsync(cancellationToken);
    
                try
                {
                    await workItem(cancellationToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, 
                       $"Error occurred executing {nameof(workItem)}.");
                }
            }
    
            _logger.LogInformation("Queued Hosted Service is stopping.");
        }
    }
    

    Registration:

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddHostedService<QueuedHostedService>();
        services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
    }
    

    Inject to controller:

    public class ApiController
    {
        private IBackgroundTaskQueue queue;
        public ApiController(IBackgroundTaskQueue queue)
        {
            this.queue = queue;
        }
    
        public IActionResult StartProcessing()
        {
            queue.QueueBackgroundWorkItem(async token =>
            {
                // put processing code here
            }
    
            return Ok();
        }
    }
    

    You can modify BackgroundTaskQueue to fit your requirements, but I hope you understand the idea behind this.