Search code examples
c#throttlingparallel.foreach.net-6.0parallel.foreachasync

Is it possible to throttle Parallel.ForEachAsync in .NET 6.0 to avoid rate limiting?


I'm fairly new to programming (< 3 years exp), so I don't have a great understanding of the subjects in this post. Please bear with me.

My team is developing an integration with a third party system, and one of the third party's endpoints lacks a meaningful way to get a list of entities matching a condition.

We have been fetching these entities by looping over the collection of requests, and adding the results of each awaited call to a list. This works just fine, but getting the entities takes a lot longer than getting entities from other endpoints that lets us get a list of entities by providing a list of ids.

.NET 6.0 introduced Parallel.ForEachAsync(), which lets us execute multiple awaitable tasks asynchronously in parallel.

For example:

public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(List<IRestRequest> requests) 
where TEntity : IEntity
{
    var entities = new ConcurrentBag<TEntity>();

    // Create a function that takes a RestRequest and returns the 
    // result of the request's execution, for each request
    var requestExecutionTasks = requests.Select(i => 
        new Func<Task<TEntity>>(() => GetAsync<TEntity>(i)));

    // Execute each of the functions asynchronously in parallel, 
    // and add the results to the aggregate as they come in
    await Parallel.ForEachAsync(requestExecutionTasks, new ParallelOptions
    {
        // This lets us limit the number of threads to use. -1 is unlimited
        MaxDegreeOfParallelism = -1 
    }, async (func, _) => entities.Add(await func()));

    return entities.ToList();
}

Using this code rather than the simple foreach-loop sped up the time it takes to get the ~30 entities on my test instance, by 91% on average. That's awesome. However, we are worried about the rate limiting that is likely to occur when we use it on a client's system with possibly thousands of entities. We have a system in place that detects the "you are rate limited"-message from their API, and cues the requests for a second or so before trying again, but this is not as much a good solution as it is a safety measure.

If we where just looping over the requests, we could have throttled the calls by doing something like await Task.Delay(minimumDelay) in each iteration of the loop. Correct me if I'm wrong, but from what I understand this wouldn't actually work when executing the requests in parallel foreach, as it would make all requests wait the same amount of time before the execution. Is there a way to make each individual request wait a certain amount of time before execution, only if we are close to being rate limited? If at all possible, I would like to do this without limiting the number of threads to use.


Edit

I wanted to let this question sit a little so more people could answer. Since no new answers or comments have been added, I'm marking the one answer I got as correct. That being said, the answer suggests a different approach than using Parallel.ForEachAsync.

If I understand the current answer correctly, the answer to my original question of whether or not it's possible to throttle Parallel.ForEachAsync, would be: "no, it's not".


Solution

  • My suggestion is to ditch the Parallel.ForEachAsync approach, and use instead the new Chunk LINQ operator in combination with the Task.WhenAll method. You can launch 100 asynchronous operations every second like this:

    public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(
        List<IRestRequest> requests) where TEntity : IEntity
    {
        var tasks = new List<Task<TEntity>>();
        foreach (var chunk in requests.Chunk(100))
        {
            tasks.AddRange(chunk.Select(request => GetAsync<TEntity>(request)));
            await Task.Delay(TimeSpan.FromSeconds(1.0));
        }
        return (await Task.WhenAll(tasks)).ToList();
    }
    

    It is assumed that the time required to launch an asynchronous operation (to invoke the GetAsync method) is negligible.

    This approach has the inherent disadvantage that in case of an exception, the failure will not be propagated before all operations are completed. For comparison the Parallel.ForEachAsync method stops invoking the async delegate and completes ASAP, after the first failure is detected.