I have a Windows service that reads data from the database and processes this data using multiple REST API calls.
Originally, this service ran on a timer where it would read unprocessed data from the database and process it using multiple threads limited using SemaphoreSlim
. This worked well except that the database read had to wait for all processing to finish before reading again.
ServicePointManager.DefaultConnectionLimit = 10;
Original that works:
// Runs every 5 seconds on a timer
private void ProcessTimer_Elapsed(object sender, ElapsedEventArgs e)
{
var hasLock = false;
try
{
Monitor.TryEnter(timerLock, ref hasLock);
if (hasLock)
{
ProcessNewData();
}
else
{
log.Info("Failed to acquire lock for timer."); // This happens all of the time
}
}
finally
{
if (hasLock)
{
Monitor.Exit(timerLock);
}
}
}
public void ProcessNewData()
{
var unproceesedItems = GetDatabaseItems();
if (unproceesedItems.Count > 0)
{
var downloadTasks = new Task[unproceesedItems.Count];
var maxThreads = new SemaphoreSlim(semaphoreSlimMinMax, semaphoreSlimMinMax); // semaphoreSlimMinMax = 10 is max threads
for (var i = 0; i < unproceesedItems .Count; i++)
{
maxThreads.Wait();
var iClosure = i;
downloadTasks[i] =
Task.Run(async () =>
{
try
{
await ProcessItemsAsync(unproceesedItems[iClosure]);
}
catch (Exception ex)
{
// handle exception
}
finally
{
maxThreads.Release();
}
});
}
Task.WaitAll(downloadTasks);
}
}
To improve efficiency, I rewrite the service to run GetDatabaseItems
in a separate thread from the rest so that there is a ConcurrentDictionary
of unprocessed items between them that GetDatabaseItems
fills and ProcessNewData
empties.
The problem is that while 10 unprocessed items are send to ProcessItemsAsync
, they are processed two at a time instead of all 10.
The code inside of ProcessItemsAsync
calls var response = await client.SendAsync(request);
where the delay occurs. All 10 threads make it to this code but come out of it two at a time. None of this code changed between the old version and the new.
Here is the code in the new version that did change:
public void Start()
{
ServicePointManager.DefaultConnectionLimit = maxSimultaneousThreads; // 10
// Start getting unprocessed data
getUnprocessedDataTimer.Interval = getUnprocessedDataInterval; // 5 seconds
getUnprocessedDataTimer.Elapsed += GetUnprocessedData; // writes data into a ConcurrentDictionary
getUnprocessedDataTimer.Start();
cancellationTokenSource = new CancellationTokenSource();
// Create a new thread to process data
Task.Factory.StartNew(() =>
{
try
{
ProcessNewData(cancellationTokenSource.Token);
}
catch (Exception ex)
{
// error handling
}
}, TaskCreationOptions.LongRunning
);
}
private void ProcessNewData(CancellationToken token)
{
// Check if task has been canceled.
while (!token.IsCancellationRequested)
{
if (unprocessedDictionary.Count > 0)
{
try
{
var throttler = new SemaphoreSlim(maxSimultaneousThreads, maxSimultaneousThreads); // maxSimultaneousThreads = 10
var tasks = unprocessedDictionary.Select(async item =>
{
await throttler.WaitAsync(token);
try
{
if (unprocessedDictionary.TryRemove(item.Key, out var item))
{
await ProcessItemsAsync(item);
}
}
catch (Exception ex)
{
// handle error
}
finally
{
throttler.Release();
}
});
Task.WhenAll(tasks);
}
catch (OperationCanceledException)
{
break;
}
}
Thread.Sleep(1000);
}
}
I tried the following with the same bad result (two await client.SendAsync(request)
completing at a time):
ServicePointManager.DefaultConnectionLimit
to 30Thread.Start()
Task.Run(async () =>
and Task.WaitAll(downloadTasks);
ProcessNewData
with a timerWhat I want is to run GetUnprocessedData
and ProcessNewData
concurrently with an HttpClient connection limit of 10 (set in config) so that 10 requests are processed at the same time.
Note: the issue is similar to HttpClient.GetAsync executes only 2 requests at a time? but the DefaultConnectionLimit
is increased and the service runs on a Windows Server. It also creates more than 2 connections when original code runs.
I went back to the original project to make sure it still worked, it did. I added a new timer to perform some unrelated operations and the httpClient
issue came back. I removed the timer, everything worked. I added a new thread to do parallel processing, the problem came back.
The issue turned out to be the place where ServicePointManager.DefaultConnectionLimit
is set.
In the version where HttpClient
was only doing two requests at a time, ServicePointManager.DefaultConnectionLimit
was being set before the threads were being created but after the HttpClient
was initialized.
Once I moved it into the constructor before the HttpClient
is initialized, everything started working.
Thank you very much to @Theodor Zoulias for the help.
TLDR; Set ServicePointManager.DefaultConnectionLimit
before initializing the HttpClient
.