I'm trying to implement self-hosted web service using asp.net core 2.1 and got stuck with the problem of implementing background long-time execution tasks.
Due to the high CPU load and time consumption of each ProcessSingle
method (in the code snippet below), I would like to limit the number of executing simultaneous tasks. But as I can see all tasks in Parallel.ForEach
start almost immediately, despite the fact that I set MaxDegreeOfParallelism = 3
My code is (it's a simplified version):
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Factory.StartNew(async () => {
Parallel.ForEach(
listOfData,
new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
async x => await ProcessSingle(x));
});
// return created id immediately
return id;
}
public static async Task ProcessSingle(MyInputData inputData)
{
var dbData = await GetDataFromDb(); // get data from DB async using Dapper
// some lasting processing (sync)
await SaveDataToDb(); // async save processed data to DB using Dapper
}
If I understand correctly, the problem is in async x => await ProcessSingle(x)
inside Parallel.ForEach, isn't it?
Could someone describe please, how it should be implemented in the right way?
Update
Due to some kind of ambiguity in my question, it's necessary to focus on the main aspects:
There are three parts in ProcessSingle
method:
getting data from DB async
make long-time high CPU-loaded math calculations
save results to DB async
The problem consists of two separate:
How to decrease CPU usage (by running not more than three math simultaneous calculations for example)?
How to keep the structure of the ProcessSingle
method - keep them async because of async DB calls.
Hope it would be more clear now.
P.S. The suitable answer has been already given, it works (especially thanks to @MatrixTai). This update has been written for general clarification.
Update
As I just notice you mentioned in comment, the problem is caused by math calculation.
It will be better to separate the part of calculation and updating DB.
For the calculation part, use Parallel.ForEach()
so as to optimize your work and you can control the thread number.
And only after all these tasks finished. Use async-await
to update your data to DB without SemaphoreSlim
I mentioned.
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
//Calculation Part
ConcurrentBag<int> data = new ConcurrentBag<int>();
Parallel.ForEach(
listOfData,
new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
x => {ConcurrentBag.Add(calculationPart(x))});
//Update DB part
int[] data_arr = data.ToArray();
List<Task> worker = new List<Task>();
foreach (var i in data_arr)
{
worker.Add(DBPart(x));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
For sure they all start together as you use async-await
in Parallel.forEach
.
First, read about this question for both 1st and 2nd answer. Combining these two are meaningless.
Actually async-await
will maximize the usage of available thread, so simply use it.
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
foreach (var i in listOfData)
{
worker.Add(ProcessSingle(x));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
But then here is another problem, in this case those tasks still start all together, eating your CPU-usage.
So to avoid this, use SemaphoreSlim
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
//To limit the number of Task started.
var throttler = new SemaphoreSlim(initialCount: 20);
foreach (var i in listOfData)
{
await throttler.WaitAsync();
worker.Add(Task.Run(async () =>
{
await ProcessSingle(x);
throttler.Release();
}
));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
Read more How to limit the amount of concurrent async I/O operations?.
Also, do not use Task.Factory.StartNew()
when simple Task.Run()
is enough to do work you want, read this excellent article by Stephen Cleary.