Search code examples
c#multithreadingasync-awaitdotnet-httpclientproducer-consumer

HttpClient.SendAsync processes two requests at a time when the limit is higher


I have a Windows service that reads data from the database and processes this data using multiple REST API calls.

Originally, this service ran on a timer where it would read unprocessed data from the database and process it using multiple threads limited using SemaphoreSlim. This worked well except that the database read had to wait for all processing to finish before reading again.

ServicePointManager.DefaultConnectionLimit = 10;

Original that works:

// Runs every 5 seconds on a timer
private void ProcessTimer_Elapsed(object sender, ElapsedEventArgs e)
{
    var hasLock = false;
    try
    {
        Monitor.TryEnter(timerLock, ref hasLock);
        if (hasLock)
        {
            ProcessNewData();
        }
        else
        {
            log.Info("Failed to acquire lock for timer."); // This happens all of the time
        }
    }
    finally
    {
        if (hasLock)
        {
            Monitor.Exit(timerLock);
        }
    }
}

public void ProcessNewData()
{
    var unproceesedItems = GetDatabaseItems();

    if (unproceesedItems.Count > 0)
    {
        var downloadTasks = new Task[unproceesedItems.Count];
        var maxThreads = new SemaphoreSlim(semaphoreSlimMinMax, semaphoreSlimMinMax); // semaphoreSlimMinMax = 10 is max threads

        for (var i = 0; i < unproceesedItems .Count; i++)
        {
            maxThreads.Wait();
            var iClosure = i;
            downloadTasks[i] =
            Task.Run(async () =>
                {
                    try
                    {
                        await ProcessItemsAsync(unproceesedItems[iClosure]);
                    }
                    catch (Exception ex)
                    {
                        // handle exception
                    }
                    finally
                    {
                        maxThreads.Release();
                    }
                });
        }

        Task.WaitAll(downloadTasks);
    }
}

To improve efficiency, I rewrite the service to run GetDatabaseItems in a separate thread from the rest so that there is a ConcurrentDictionary of unprocessed items between them that GetDatabaseItems fills and ProcessNewData empties.

The problem is that while 10 unprocessed items are send to ProcessItemsAsync, they are processed two at a time instead of all 10.

The code inside of ProcessItemsAsync calls var response = await client.SendAsync(request); where the delay occurs. All 10 threads make it to this code but come out of it two at a time. None of this code changed between the old version and the new.

Here is the code in the new version that did change:

public void Start()
{
    ServicePointManager.DefaultConnectionLimit = maxSimultaneousThreads;  // 10

    // Start getting unprocessed data
    getUnprocessedDataTimer.Interval = getUnprocessedDataInterval; // 5 seconds
    getUnprocessedDataTimer.Elapsed += GetUnprocessedData; // writes data into a ConcurrentDictionary
    getUnprocessedDataTimer.Start();

    cancellationTokenSource = new CancellationTokenSource();

    // Create a new thread to process data
    Task.Factory.StartNew(() =>
       {
           try
           {
               ProcessNewData(cancellationTokenSource.Token);
           }
           catch (Exception ex)
           {
               // error handling
           }
       }, TaskCreationOptions.LongRunning
    );

}

private void ProcessNewData(CancellationToken token)
{
    // Check if task has been canceled.
    while (!token.IsCancellationRequested)
    {
        if (unprocessedDictionary.Count > 0)
        {
            try
            {
                var throttler = new SemaphoreSlim(maxSimultaneousThreads, maxSimultaneousThreads); // maxSimultaneousThreads = 10
                var tasks = unprocessedDictionary.Select(async item =>
                {
                    await throttler.WaitAsync(token);
                    try
                    {
                        if (unprocessedDictionary.TryRemove(item.Key, out var item))
                        {
                            await ProcessItemsAsync(item);
                        }
                    }
                    catch (Exception ex)
                    {
                        // handle error
                    }
                    finally
                    {
                        throttler.Release();
                    }
                });
                Task.WhenAll(tasks);
            }
            catch (OperationCanceledException)
            {
                break;
            }
        }

        Thread.Sleep(1000);
    }
}

Environment

  • .NET Framework 4.7.1
  • Windows Server 2016
  • Visual Studio 2019

Attempts to fix:

I tried the following with the same bad result (two await client.SendAsync(request) completing at a time):

  • Set Max threads and ServicePointManager.DefaultConnectionLimit to 30
  • Manually create threads using Thread.Start()
  • Replace async/await pattern with sync HttpClient calls
  • Call data processing using Task.Run(async () => and Task.WaitAll(downloadTasks);
  • Replace the new long-running thread for ProcessNewData with a timer

What I want is to run GetUnprocessedData and ProcessNewData concurrently with an HttpClient connection limit of 10 (set in config) so that 10 requests are processed at the same time.

Note: the issue is similar to HttpClient.GetAsync executes only 2 requests at a time? but the DefaultConnectionLimit is increased and the service runs on a Windows Server. It also creates more than 2 connections when original code runs.

Update

I went back to the original project to make sure it still worked, it did. I added a new timer to perform some unrelated operations and the httpClient issue came back. I removed the timer, everything worked. I added a new thread to do parallel processing, the problem came back.


Solution

  • The issue turned out to be the place where ServicePointManager.DefaultConnectionLimit is set.

    In the version where HttpClient was only doing two requests at a time, ServicePointManager.DefaultConnectionLimit was being set before the threads were being created but after the HttpClient was initialized.

    Once I moved it into the constructor before the HttpClient is initialized, everything started working.

    Thank you very much to @Theodor Zoulias for the help.

    TLDR; Set ServicePointManager.DefaultConnectionLimit before initializing the HttpClient.