I have a list of id's
and I want to get data for each of those id
in parallel from database. My below ExecuteAsync
method is called at very high throughput and for each request we have around 500 ids
for which I need to extract data.
So I have got below code where I am looping around list of ids
and making async calls for each of those id
in parallel and it works fine.
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
{
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
var excludeNull = new List<T>(ids.Count);
for (int i = 0; i < responses.Length; i++)
{
var response = responses[i];
if (response != null)
{
excludeNull.Add(response);
}
}
return excludeNull;
}
private async Task<T> Execute<T>(IPollyPolicy policy,
Func<CancellationToken, Task<T>> requestExecuter) where T : class
{
var response = await policy.Policy.ExecuteAndCaptureAsync(
ct => requestExecuter(ct), CancellationToken.None);
if (response.Outcome == OutcomeType.Failure)
{
if (response.FinalException != null)
{
// log error
throw response.FinalException;
}
}
return response?.Result;
}
Question:
Now as you can see I am looping all ids
and making bunch of async calls to database in parallel for each id
which can put lot of load on database (depending on how many request is coming). So I want to limit the number of async calls we are making to database. I modified ExecuteAsync
to use Semaphore
as shown below but it doesn't look like it does what I want it to do:
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var throttler = new SemaphoreSlim(250);
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
{
await throttler.WaitAsync().ConfigureAwait(false);
try
{
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
finally
{
throttler.Release();
}
}
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
// same excludeNull code check here
return excludeNull;
}
Does Semaphore works on Threads
or Tasks
? Reading it here looks like Semaphore is for Threads and SemaphoreSlim is for tasks.
Is this correct? If yes then what's the best way to fix this and limit the number of async IO tasks we make to database here.
Task is an abstraction on threads, and doesn’t necessarily create a new thread. Semaphore limits the number of threads that can access that for loop. Execute returns a Task which aren’t threads. If there’s only 1 request, there will be only 1 thread inside that for loop, even if it is asking for 500 ids. The 1 thread sends off all the async IO tasks itself.
Sort of. I would not say that tasks are related to threads at all. There are actually two kinds of tasks: a delegate task (which is kind of an abstraction of a thread), and a promise task (which has nothing to do with threads).
Regarding the SemaphoreSlim
, it does limit the concurrency of a block of code (not threads).
I recently started playing with C# so my understanding is not right looks like w.r.t Threads and Tasks.
I recommend reading my async
intro and best practices. Follow up with There Is No Thread if you're interested more about how threads aren't really involved.
I modified ExecuteAsync to use Semaphore as shown below but it doesn't look like it does what I want it to do
The current code is only throttling the adding of the tasks to the list, which is only done one at a time anyway. What you want to do is throttle the execution itself:
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy, Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var throttler = new SemaphoreSlim(250);
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
tasks.Add(ThrottledExecute(ids[i]));
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
// same excludeNull code check here
return excludeNull;
async Task<T> ThrottledExecute(int id)
{
await throttler.WaitAsync().ConfigureAwait(false);
try {
return await Execute(policy, ct => mapper(ct, id)).ConfigureAwait(false);
} finally {
throttler.Release();
}
}
}