I'm having many timeouts exceptions using Polly Bulkhead policy, this policy helps me to limit the number of concurrent calls that I'm sending to a specific hosts. However It seems that the HttpClient Timeout time affects the whole delegates.
I'm using IHttpClientFactory to configure that with the following code:
services.AddHttpClient(string.Empty)
.AddPolicyHandler(GetBulkheadPolicy(100));
private static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicy(int maxConcurrentRequests)
{
return Policy.BulkheadAsync(maxConcurrentRequests, int.MaxValue)
.AsAsyncPolicy<HttpResponseMessage>();
}
My problem is that I want that the timeout only affects to the request itselft and no to the bulkhead policy, because that behaviour that I want to achieve is the following:
I have achieve that behaviour using a Semaphore
instead of a Bulkhead Polly Policy but I'd like to encapsulate that code using a Policy.
Thanks.
I have put these samples together to demonstrate the different options how can you perform throttling on HttpClient
requests. I have to emphasize that these are just examples and are far from production code so please scrutinize them through that lens.
The following sample codes show how to issue requests in a fire and forgot manner (so they do not care about the responses). The solutions assume that there are more requests than the available throughput. In other words the producer is faster than the consumer(s) that's why there is some sort of queueing mechanism to handle this imbalance.
public class ThrottlingWithBatchBlock
{
static readonly HttpClient client = new();
private readonly BatchBlock<HttpRequestMessage> requests = new(100);
private ActionBlock<HttpRequestMessage[]> consumer;
public ThrottlingWithBatchBlock()
{
consumer = new(
reqs => ConsumerAsync(reqs),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
requests.LinkTo(consumer);
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
private async Task ConsumerAsync(HttpRequestMessage[] requests)
{
foreach (var request in requests)
await client.SendAsync(request).ConfigureAwait(false);
}
}
public class ThrottlingWithBufferBlock
{
static readonly HttpClient client = new();
private readonly BufferBlock<HttpRequestMessage> requests = new(
new DataflowBlockOptions { BoundedCapacity = 100 });
public ThrottlingWithBufferBlock()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.OutputAvailableAsync())
{
var request = await requests.ReceiveAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
public class ThrottlingWithChannels
{
static readonly HttpClient client = new();
private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
new BoundedChannelOptions(100) { SingleWriter = true, SingleReader = false });
public ThrottlingWithChannels()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.Writer.WaitToWriteAsync();
await requests.Writer.WriteAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.Reader.WaitToReadAsync())
{
var request = await requests.Reader.ReadAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
public class ThrottlingWithBlockingCollection
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithBlockingCollection()
{
_ = Enumerable.Range(1, 100)
.Select(_ => ConsumerAsync()).ToArray();
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
async Task ConsumerAsync()
{
while (true)
{
var request = requests.Take();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
public class ThrottlingWithParallelForEach
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithParallelForEach()
{
_ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false), 100);
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
}
//Based on https://codereview.stackexchange.com/a/203487
public static partial class ParallelForEach
{
public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int degreeOfParallelism)
{
var toBeProcessedJobs = new HashSet<Task>();
var remainingJobsEnumerator = source.GetEnumerator();
void AddNewJob()
{
if (remainingJobsEnumerator.MoveNext())
{
var readyToProcessJob = body(remainingJobsEnumerator.Current);
toBeProcessedJobs.Add(readyToProcessJob);
}
}
while (toBeProcessedJobs.Count < degreeOfParallelism)
{
AddNewJob();
}
while (toBeProcessedJobs.Count > 0)
{
Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
toBeProcessedJobs.Remove(processed);
AddNewJob();
}
return;
}
}