Search code examples
c#httpconcurrencythrottlingrate

Best way to throttle outgoing requests in C# by time and some extra criteria?


I have to call an external HTTP API that only allows one request every 4 seconds by userId. As long as I'm calling this API sending a different userId each time, I can call it at any rate.

In this code I'm able to comply with the external API rate, but I'm not doing it in the optimal way, since some requests get blocked by previous calls even if that userId doesn't need to wait. (Check comments in code)

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    class Program
    {
        static void Main(string[] args)
        {
            var caller = new ExternalAPICaller();

            caller.RunCalls();

            Console.ReadKey();
        }
    }

    public class ExternalAPICaller
    {
        private static SemaphoreSlim throttler = new SemaphoreSlim(20); // up to 20 concurrent calls

        private static List<string> userIds = new List<string>();

        private const int rateLimitByUser = 4000;

        public async Task CallAPIWithThrottling(string userId)
        {
            if (userIds.Contains(userId)) Thread.Sleep(rateLimitByUser);

            userIds.Add(userId);

            await throttler.WaitAsync();

            var task = MockHttpCall(userId);

            _ = task.ContinueWith(async s =>
            {
                await Task.Delay(rateLimitByUser);
                throttler.Release();
                userIds.Remove(userId);
            });
        }

        public Task MockHttpCall(string id)
        {
            Console.WriteLine("http call for " + id);
            Thread.Sleep(300);
            return Task.CompletedTask;
        }

        public async Task RunCalls()
        {
            await CallAPIWithThrottling("Mike");
            await CallAPIWithThrottling("John");
            await CallAPIWithThrottling("Sarah");
            await CallAPIWithThrottling("Matt");

            await CallAPIWithThrottling("John");
            await CallAPIWithThrottling("Jacob"); // this should be called right away, but the second John makes it wait

            await CallAPIWithThrottling("John");
            await CallAPIWithThrottling("Amy"); // this should be called right away, but the thrid John makes it wait
        }
    }
}


Solution

  • I would try to abstract the throttling functionality, so that I can test it independently. I would make a Throttler class that can be configured with global and per-user concurrency limits and delays. In your case the configuration would be:

    • Global concurrency limit: 20
    • Global delay: 0 (simultaneous requests for different users are allowed)
    • Per user concurrency limit: 1
    • Per user delay: 4000

    Here is an implementation of the Throttler class. The per user concurrency limit is omitted for simplicity (it would require a second SemaphoreSlim per user).

    public class Throttler<TKey>
    {
        private readonly SemaphoreSlim _globalConcurrencySemaphore;
        private readonly SemaphoreSlim _globalDelaySemaphore;
        private readonly int _globalDelay;
        private readonly int _perKeyDelay;
        private readonly ConcurrentDictionary<TKey, SemaphoreSlim> _perKeyDelaySemaphores;
    
        public Throttler(int globalConcurrencyLimit, int globalDelay, int perKeyDelay)
        {
            _globalConcurrencySemaphore = new SemaphoreSlim(globalConcurrencyLimit,
                 globalConcurrencyLimit);
            _globalDelaySemaphore = new SemaphoreSlim(1, 1);
            _globalDelay = globalDelay;
            _perKeyDelay = perKeyDelay;
            _perKeyDelaySemaphores = new ConcurrentDictionary<TKey, SemaphoreSlim>();
        }
    
        public async Task<TResult> Execute<TResult>(TKey key,
            Func<Task<TResult>> taskFactory)
        {
            var perKeyDelaySemaphore = _perKeyDelaySemaphores.GetOrAdd(
                key, _ => new SemaphoreSlim(1, 1));
            await perKeyDelaySemaphore.WaitAsync().ConfigureAwait(false);
            ReleaseAsync(perKeyDelaySemaphore, _perKeyDelay);
            await _globalDelaySemaphore.WaitAsync().ConfigureAwait(false);
            ReleaseAsync(_globalDelaySemaphore, _globalDelay);
            await _globalConcurrencySemaphore.WaitAsync().ConfigureAwait(false);
            try
            {
                var task = taskFactory();
                return await task.ConfigureAwait(false);
            }
            finally
            {
                _globalConcurrencySemaphore.Release();
            }
        }
    
        private async void ReleaseAsync(SemaphoreSlim semaphore, int delay)
        {
            await Task.Delay(delay).ConfigureAwait(false);
            semaphore.Release();
        }
    }
    

    The delay is between one semaphore acquirement and the next. The delay of the HTTP call is not taken into account.

    Usage example:

    var throttler = new Throttler<string>(20, 0, 4000);
    var keys = new string[] { "Mike", "John", "Sarah", "Matt", "John", "Jacob",
        "John", "Amy" };
    var tasks = new List<Task>();
    foreach (var key in keys)
    {
        tasks.Add(throttler.Execute(key, () => MockHttpCall(key)));
    }
    Task.WaitAll(tasks.ToArray());
    
    async Task<int> MockHttpCall(string id)
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} HTTP call for " + id);
        await Task.Delay(300);
        return 0;
    }
    

    Output:

    11:20:41.635 HTTP call for Mike
    11:20:41.652 HTTP call for John
    11:20:41.652 HTTP call for Sarah
    11:20:41.652 HTTP call for Matt
    11:20:41.653 HTTP call for Jacob
    11:20:41.654 HTTP call for Amy
    11:20:45.965 HTTP call for John
    11:20:50.272 HTTP call for John