My application hangs (force-killed by Polly's Circuit breaker or timeout) when trying to concurrently receive and deserialize very large JSON files, containing over 12,000,000 chars each.
using System.Text.Json;
Parallel.For(0, 1000000, async (i, state) =>
var strategy = GetPollyResilienceStrategy(); // see the method implementation in the following.
await strategy.ExecuteAsync(async () =>
var stream = await httpClient.GetStreamAsync(
GetEndPoint(i), cancellationToken);
var foo = await JsonSerializer.DeserializeAsync<Foo>(
stream, cancellationToken: cancellationToken);
// Process may require API calls to the client,
// but no other API calls and JSON deserialization
// is required after processedFoo is obtained.
var processedFoo = Process(foo); // Long running CPU and IO bound task, that may involve additional API calls and JSON deserialization.
queue.Add(processedFoo); // A background task, implementation in the following.
I use different resilience strategies, one from GetPollyResilienceStrategy
to process the item i
, and one for the HttpClient
. The former is more relevant here, so I'm sharing that but I'm also happy to share the strategy used for the HttpClient
if needed.
AsyncPolicyWrap GetPollyResilienceStrategy()
var retry = Policy
retryCount: 3,
medianFirstRetryDelay: TimeSpan.FromMinutes(1)));
var timeout = Policy.TimeoutAsync(timeout: TimeSpan.FromMinutes(5));
var circuitBreaker = Policy
failureThreshold: 0.5,
samplingDuration: TimeSpan.FromMinutes(10),
minimumThroughput: 2,
durationOfBreak: TimeSpan.FromMinutes(1));
var strategy = Policy.WrapAsync(retry, timeout, circuitBreaker);
return strategy;
And the background task is implemented as the following.
var queue = new BlockingCollection<Foo>();
var listner = Task.Factory.StartNew(() =>
while (true)
Foo foo;
foo = queue.Take(cancellationToken);
catch (OperationCanceledException)
creationOptions: TaskCreationOptions.LongRunning);
My .NET console application receives a very large JSON file over HTTP and tries to deserialize it by reading some fields necessary for the application and ignoring others. A successful process is expected to run for a few days. Though the program "hangs" after a few hours. After extensive debugging, it turns out an increasing number of threads are stuck trying to deserialize the JSON, and the program hangs (i.e., the Parallel.For
does not start another one) when ~5 threads are stuck. Every time it gets stuck at a different i
, and since JSON objects are very large, it is not feasible to log every received JSON for debugging.
Why does it get stuck? Is there any built-in max capacity in JsonSerializer
that is reached? e.g., buffer size?
Is it possible that GetStreamAsync
is reading corrupt data, hence JsonSerializer
is stuck in some corner case trying to deserialize a corrupt JSON?
I found this thread relevant, though not sure if there was a resolution other than "fixed in newer version"
The program eventually exists but as a result of either the circuit breaker or timeout. I have given very long intervals in the resilience strategy, e.g., giving the process 20min
to try deserializing JSON before retrying.
Even without knowing how did you set up your resiliency strategy it seems like you want to kill two birds with one stone:
I would recommend to separate these two.
The GetStreamAsync
call returns a Task<Stream>
which does not allow you to access the underlying HttpResponseMessage
But if you issue your request for the stream like this:
var response = await httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead);
using var stream = await response.Content.ReadAsStreamAsync();
then you would be able to decorate the GetAsync
call with a http based Polly policy definition.
Looking to this problem only from resilience perspective it would make sense to use a combination of CancellationTokenSource
s to enforce timeout like this:
CancellationTokenSource userCancellation = ...;
var timeoutSignal = new CancellationTokenSource(TimeSpan.FromMinutes(20));
var combinedCancellation = CancellationTokenSource.CreateLinkedTokenSource(userCancellation.Token, timeoutSignal.Token);
var foo = await JsonSerializer.DeserializeAsync<Foo>(stream, combinedCancellation.Token);
But you could achieve the same with optimistic timeout policy.
var foo = await timeoutPolicy.ExecuteAsync(
async token => await JsonSerializer.DeserializeAsync<Foo>(stream, token), ct);
My understanding is the strategy used for every item is independent from the other, correct? So if the circuit breaks, why the parallel.ForEachAsync does not continue with the others?
The timeout and retry policies are stateless. Whereas the circuit breaker maintains a state which is shared between the executions. Here I have detailed some internals if you are interested how does it work under the hood.
Also, if the loop is broken, why no exception?
If the threshold of the successive/subsequent requests is reached then the CB transitions from Closed to Open and the CB throws the original exception (if it was triggered for some exception). In Open state if you try to perform a new request then it will short-cut the execution with a BrokenCircuitException
So, back to your question. Yes, there should be an exception but because you have used Parallel.Foreach
which does not support async
that's why the exception is shallowed. If you would have used await Parallel.ForeachAsync
then it should throw the exception.
After assessing your GetPollyResilienceStrategy
code I have two more advices:
from AsyncPolicyWrap
is an implementation detail and should not be exposedPolicy.WrapAsync(retry, circuitBreaker, timeout);
as well