Search code examples
c#jsonconcurrencysystem.text.jsonpolly

Parsing multiple very large JSON files concurrently


My application hangs (force-killed by Polly's Circuit breaker or timeout) when trying to concurrently receive and deserialize very large JSON files, containing over 12,000,000 chars each.

using System.Text.Json;

Parallel.For(0, 1000000, async (i, state) =>
{
    var strategy = GetPollyResilienceStrategy(); // see the method implementation in the following.
    await strategy.ExecuteAsync(async () =>
    {
        var stream = await httpClient.GetStreamAsync(
            GetEndPoint(i), cancellationToken);
        var foo = await JsonSerializer.DeserializeAsync<Foo>(
            stream, cancellationToken: cancellationToken);

        // Process may require API calls to the client, 
        // but no other API calls and JSON deserialization 
        // is required after processedFoo is obtained.
        var processedFoo = Process(foo); // Long running CPU and IO bound task, that may involve additional API calls and JSON deserialization.
        queue.Add(processedFoo); // A background task, implementation in the following.
    });
});

I use different resilience strategies, one from GetPollyResilienceStrategy to process the item i, and one for the HttpClient. The former is more relevant here, so I'm sharing that but I'm also happy to share the strategy used for the HttpClient if needed.

AsyncPolicyWrap GetPollyResilienceStrategy()
{
    var retry = Policy
        .Handle<Exception>()
        .WaitAndRetryAsync(Backoff.DecorrelatedJitterBackoffV2(
            retryCount: 3,
            medianFirstRetryDelay: TimeSpan.FromMinutes(1)));

    var timeout = Policy.TimeoutAsync(timeout: TimeSpan.FromMinutes(5));

    var circuitBreaker = Policy
        .Handle<Exception>()
        .AdvancedCircuitBreakerAsync(
            failureThreshold: 0.5,
            samplingDuration: TimeSpan.FromMinutes(10),
            minimumThroughput: 2,
            durationOfBreak: TimeSpan.FromMinutes(1));

    var strategy = Policy.WrapAsync(retry, timeout, circuitBreaker);

    return strategy;
}

And the background task is implemented as the following.

var queue = new BlockingCollection<Foo>();

var listner = Task.Factory.StartNew(() =>
{
    while (true)
    {
        Foo foo;
        try
        {
            foo = queue.Take(cancellationToken);
        }
        catch (OperationCanceledException)
        {
            break;
        }

        LongRunningCpuBoundTask(foo);
        SerializeToCsv(foo);
    }
},
creationOptions: TaskCreationOptions.LongRunning);

Context

My .NET console application receives a very large JSON file over HTTP and tries to deserialize it by reading some fields necessary for the application and ignoring others. A successful process is expected to run for a few days. Though the program "hangs" after a few hours. After extensive debugging, it turns out an increasing number of threads are stuck trying to deserialize the JSON, and the program hangs (i.e., the Parallel.For does not start another one) when ~5 threads are stuck. Every time it gets stuck at a different i, and since JSON objects are very large, it is not feasible to log every received JSON for debugging.

  1. Why does it get stuck? Is there any built-in max capacity in JsonSerializer that is reached? e.g., buffer size?

  2. Is it possible that GetStreamAsync is reading corrupt data, hence JsonSerializer is stuck in some corner case trying to deserialize a corrupt JSON?

I found this thread relevant, though not sure if there was a resolution other than "fixed in newer version" https://github.com/dotnet/runtime/issues/41604

The program eventually exists but as a result of either the circuit breaker or timeout. I have given very long intervals in the resilience strategy, e.g., giving the process 20min to try deserializing JSON before retrying.


Solution

  • Even without knowing how did you set up your resiliency strategy it seems like you want to kill two birds with one stone:

    • Add resilient behaviour for the http based communication
    • Add resilient behaviour for the stream parsing

    I would recommend to separate these two.

    GetStreamAsync

    The GetStreamAsync call returns a Task<Stream> which does not allow you to access the underlying HttpResponseMessage.

    But if you issue your request for the stream like this:

    var response = await httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead);
    using var stream = await response.Content.ReadAsStreamAsync();
    

    then you would be able to decorate the GetAsync call with a http based Polly policy definition.

    DeserializeAsync

    Looking to this problem only from resilience perspective it would make sense to use a combination of CancellationTokenSources to enforce timeout like this:

    CancellationTokenSource userCancellation = ...;
    var timeoutSignal = new CancellationTokenSource(TimeSpan.FromMinutes(20));
    var combinedCancellation = CancellationTokenSource.CreateLinkedTokenSource(userCancellation.Token, timeoutSignal.Token);
    ... 
    
    var foo = await JsonSerializer.DeserializeAsync<Foo>(stream, combinedCancellation.Token);
    

    But you could achieve the same with optimistic timeout policy.

    var foo = await timeoutPolicy.ExecuteAsync(
      async token => await JsonSerializer.DeserializeAsync<Foo>(stream, token), ct);
    

    UPDATE #1

    My understanding is the strategy used for every item is independent from the other, correct? So if the circuit breaks, why the parallel.ForEachAsync does not continue with the others?

    The timeout and retry policies are stateless. Whereas the circuit breaker maintains a state which is shared between the executions. Here I have detailed some internals if you are interested how does it work under the hood.

    Also, if the loop is broken, why no exception?

    If the threshold of the successive/subsequent requests is reached then the CB transitions from Closed to Open and the CB throws the original exception (if it was triggered for some exception). In Open state if you try to perform a new request then it will short-cut the execution with a BrokenCircuitException.

    So, back to your question. Yes, there should be an exception but because you have used Parallel.Foreach which does not support async that's why the exception is shallowed. If you would have used await Parallel.ForeachAsync then it should throw the exception.


    UPDATE #2 After assessing your GetPollyResilienceStrategy code I have two more advices:

    • Please change the return type of the method to IAsyncPolicy from AsyncPolicyWrap
      • AsyncPolicyWrap is an implementation detail and should not be exposed
    • Please change the order of timeout and circuit breaker
      • Policy.WrapAsync(retry, circuitBreaker, timeout);
      • In your setup the CB will not break for timeout
      • In my suggested setup the CB could break for TimeoutRejectedException as well