Search code examples
c#.net-coretask-parallel-librarybackground-service

How to run BackgroundService by counter using Task?


I have a background service in my dot net core application. That will run by capacity counter.

public class WebWorkGenerator : BackgroundService
{
    private const int ConsumeCapacityPerMinute = 10;
    private  int _capacityCounter;
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            while (ConsumeCapacityPerMinute - _capacityCounter > 0)
            {
                _capacityCounter++;

                Console.WriteLine($"Working {_capacityCounter}/{ConsumeCapacityPerMinute}");

                Task.Run(ProcessItems,stoppingToken);
            }
        }

        await Task.CompletedTask;
    }

    private async Task ProcessItems()
    {
        await Task.Delay(new Random().Next(1000, 5000));
      
        _capacityCounter--;
        
        Console.WriteLine("Processed.");
    }

} 

Background job will run until 10, because ProcessItem() send requests to another service its capacity is 10.

ProcessItems() decrease the counter if item is processed. But my response is like following:

Working 1/10
Working 2/10
Working 3/10
Working 4/10
Working 5/10
Working 6/10
Working 7/10
Working 8/10
Working 9/10
Working 10/10

And does anything else. But when Task.Run(ProcessItems,stoppingToken); executed the _capacityCounter should be decreased and while loop must continue. How can it work?


Solution

  • First of all there are several problems with your code:

    • _capacityCounter is incremented/decremented in non-thread safe manner which can cause concurrency problems (can be fixed with Interlocked.Increment/Decrement)
    • ExecuteAsync does not perform await correctly and as soon as possible (await Task.CompletedTask; is pointless from returning control to the caller point of view and combined with hosted services can lead to some problems - see this answer for example)
    • Assuming ProcessItems is "truly" async, then there is no need to wrap it in an extra Task.Run

    As for the question itself - if the goal is to have at most 10 concurrently running "process" tasks then SemaphoreSlim seems to be an appropriate synchronization primitive here:

    public class WebWorkGenerator : BackgroundService
    {
        private SemaphoreSlim _semaphore = new SemaphoreSlim(10);
        
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                await _semaphore.WaitAsync();
                _ = ProcessItems();
            }
        }
    
        private async Task ProcessItems()
        {
            try
            {
                await Task.Delay(Random.Shared.Next(1000, 5000));
            }
            finally
            {
                _semaphore.Release();
                Console.WriteLine("Processed.");
            }
        }
    } 
    

    If you want a bit more complex logic using the ConsumeCapacityPerMinute then you need to look into using some kind of rate limiting approach, for example via build-in rate limiter API (available since .NET 7) or from 3rd party library like Polly (see example from their docs).