Search code examples
c#dotnet-httpclientpollybulkhead

HttpClient TimeOut and Polly Bulkhead Policy problem


I'm having many timeouts exceptions using Polly Bulkhead policy, this policy helps me to limit the number of concurrent calls that I'm sending to a specific hosts. However It seems that the HttpClient Timeout time affects the whole delegates.

I'm using IHttpClientFactory to configure that with the following code:

services.AddHttpClient(string.Empty)
.AddPolicyHandler(GetBulkheadPolicy(100));


private static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicy(int maxConcurrentRequests)
{
    return Policy.BulkheadAsync(maxConcurrentRequests, int.MaxValue)
        .AsAsyncPolicy<HttpResponseMessage>();
}

My problem is that I want that the timeout only affects to the request itselft and no to the bulkhead policy, because that behaviour that I want to achieve is the following:

  • Limit the number of concurrent requests to a specific host
  • Wait infinitely until there is capacity to send the requests (when the queque will be full, Polly will raise an exception)
  • Send the request to the host and apply a timeout, for example the default one.

I have achieve that behaviour using a Semaphore instead of a Bulkhead Polly Policy but I'd like to encapsulate that code using a Policy.

Thanks.


Solution

  • I have put these samples together to demonstrate the different options how can you perform throttling on HttpClient requests. I have to emphasize that these are just examples and are far from production code so please scrutinize them through that glass.

    The following sample codes show how to issue requests in a fire and forgot manner (so they do not care about the responses). The solutions assume that there are more requests than the available throughput. In other words the producer is faster than the consumer(s) that's why there is some sort of queueing mechanism to handle this imbalance.

    With Back and Action Blocks

    public class ThrottlingWithBatchBlock
    {
        static readonly HttpClient client = new();
        private readonly BatchBlock<HttpRequestMessage> requests = new(100);
        private ActionBlock<HttpRequestMessage[]> consumer;
    
        public ThrottlingWithBatchBlock()
        {
            consumer = new(
                reqs => ConsumerAsync(reqs),
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
            requests.LinkTo(consumer);
        }
    
        public async Task IssueNewRequest(HttpRequestMessage request)
        {
            await requests.SendAsync(request);
        }
    
        private async Task ConsumerAsync(HttpRequestMessage[] requests)
        {
            foreach (var request in requests)
                await client.SendAsync(request).ConfigureAwait(false);
        }
    }
    

    With Buffer Block

    public class ThrottlingWithBufferBlock
    {
        static readonly HttpClient client = new();
        private readonly BufferBlock<HttpRequestMessage> requests = new(
                new DataflowBlockOptions { BoundedCapacity = 100 });
    
        public ThrottlingWithBufferBlock()
        {
            _ = ConsumerAsync();
        }
    
        public async Task IssueNewRequest(HttpRequestMessage request)
        {
            await requests.SendAsync(request);
        }
    
        async Task ConsumerAsync()
        {
            while (await requests.OutputAvailableAsync())
            {
                var request = await requests.ReceiveAsync();
                await client.SendAsync(request).ConfigureAwait(false);
            }
        }
    }
    

    With Channels

    public class ThrottlingWithChannels
    {
        static readonly HttpClient client = new();
        private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
                new BoundedChannelOptions(100) { SingleWriter = true, SingleReader = false });
    
        public ThrottlingWithChannels()
        {
            _ = ConsumerAsync();
        }
    
        public async Task IssueNewRequest(HttpRequestMessage request)
        {
            await requests.Writer.WaitToWriteAsync();
            await requests.Writer.WriteAsync(request);
        }
    
        async Task ConsumerAsync()
        {
            while (await requests.Reader.WaitToReadAsync())
            {
                var request = await requests.Reader.ReadAsync();
                await client.SendAsync(request).ConfigureAwait(false);
            }
        }
    }
    

    With Blocking Collection

    public class ThrottlingWithBlockingCollection
    {
        static readonly HttpClient client = new();
        private BlockingCollection<HttpRequestMessage> requests = new();
    
        public ThrottlingWithBlockingCollection()
        {
            _ = Enumerable.Range(1, 100)
                .Select(_ => ConsumerAsync()).ToArray();
        }
    
        public Task IssueNewRequest(HttpRequestMessage request)
        {
            requests.Add(request);
            return Task.CompletedTask;
        }
    
        async Task ConsumerAsync()
        {
            while (true)
            {
                var request = requests.Take();
                await client.SendAsync(request).ConfigureAwait(false);
            }
        }
    }
    

    With Parallel Foreach

    public class ThrottlingWithParallelForEach
    {
        static readonly HttpClient client = new();
        private BlockingCollection<HttpRequestMessage> requests = new();
    
        public ThrottlingWithParallelForEach()
        {
            _ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false), 100);
        }
    
        public Task IssueNewRequest(HttpRequestMessage request)
        {
            requests.Add(request);
            return Task.CompletedTask;
        }
    }
    
    //Based on https://codereview.stackexchange.com/a/203487
    public static partial class ParallelForEach
    {
        public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int degreeOfParallelism)
        {
            var toBeProcessedJobs = new HashSet<Task>();
            var remainingJobsEnumerator = source.GetEnumerator();
    
            void AddNewJob()
            {
                if (remainingJobsEnumerator.MoveNext())
                {
                    var readyToProcessJob = body(remainingJobsEnumerator.Current);
                    toBeProcessedJobs.Add(readyToProcessJob);
                }
            }
    
            while (toBeProcessedJobs.Count < degreeOfParallelism)
            {
                AddNewJob();
            }
    
            while (toBeProcessedJobs.Count > 0)
            {
                Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
                toBeProcessedJobs.Remove(processed);
                AddNewJob();
            }
    
            return;
        }
    }