Search code examples
c#async-awaittasktask-parallel-libraryparallel.foreach

Limit maximum of concurrent operation with Parallel.ForEach and async Action


I'm trying to implement self-hosted web service using asp.net core 2.1 and got stuck with the problem of implementing background long-time execution tasks.

Due to the high CPU load and time consumption of each ProcessSingle method (in the code snippet below), I would like to limit the number of executing simultaneous tasks. But as I can see all tasks in Parallel.ForEachstart almost immediately, despite the fact that I set MaxDegreeOfParallelism = 3

My code is (it's a simplified version):

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Factory.StartNew(async () => {
        Parallel.ForEach(
            listOfData,
            new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
            async x => await ProcessSingle(x));
    });

    // return created id immediately
    return id;
}

public static async Task ProcessSingle(MyInputData inputData)
{
    var dbData = await GetDataFromDb(); // get data from DB async using Dapper
    // some lasting processing (sync)
    await SaveDataToDb(); // async save processed data to DB using Dapper
}

If I understand correctly, the problem is in async x => await ProcessSingle(x) inside Parallel.ForEach, isn't it?

Could someone describe please, how it should be implemented in the right way?

Update

Due to some kind of ambiguity in my question, it's necessary to focus on the main aspects:

  1. There are three parts in ProcessSingle method:

    • getting data from DB async

    • make long-time high CPU-loaded math calculations

    • save results to DB async

  2. The problem consists of two separate:

    • How to decrease CPU usage (by running not more than three math simultaneous calculations for example)?

    • How to keep the structure of the ProcessSingle method - keep them async because of async DB calls.

Hope it would be more clear now.

P.S. The suitable answer has been already given, it works (especially thanks to @MatrixTai). This update has been written for general clarification.


Solution

  • Update

    As I just notice you mentioned in comment, the problem is caused by math calculation.

    It will be better to separate the part of calculation and updating DB.

    For the calculation part, use Parallel.ForEach() so as to optimize your work and you can control the thread number.

    And only after all these tasks finished. Use async-await to update your data to DB without SemaphoreSlim I mentioned.

    public static async Task<int> Work()
    {
        var id = await CreateIdInDB() // async create record in DB
    
        // run background task, don't wait when it finishes
        Task.Run(async () => {
    
            //Calculation Part
            ConcurrentBag<int> data = new ConcurrentBag<int>();
            Parallel.ForEach(
                listOfData,
                new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
                x => {ConcurrentBag.Add(calculationPart(x))});
    
            //Update DB part
            int[] data_arr = data.ToArray();
            List<Task> worker = new List<Task>();
            foreach (var i in data_arr)
            {
                worker.Add(DBPart(x));
            }
            await Task.WhenAll(worker);
        });
    
        // return created id immediately
        return id;
    }
    

    For sure they all start together as you use async-await in Parallel.forEach.

    First, read about this question for both 1st and 2nd answer. Combining these two are meaningless.

    Actually async-await will maximize the usage of available thread, so simply use it.

    public static async Task<int> Work()
    {
        var id = await CreateIdInDB() // async create record in DB
    
        // run background task, don't wait when it finishes
        Task.Run(async () => {
            List<Task> worker = new List<Task>();
            foreach (var i in listOfData)
            {
                worker.Add(ProcessSingle(x));
            }
            await Task.WhenAll(worker);
        });
    
        // return created id immediately
        return id;
    }
    

    But then here is another problem, in this case those tasks still start all together, eating your CPU-usage.

    So to avoid this, use SemaphoreSlim

    public static async Task<int> Work()
    {
        var id = await CreateIdInDB() // async create record in DB
    
        // run background task, don't wait when it finishes
        Task.Run(async () => {
            List<Task> worker = new List<Task>();
            //To limit the number of Task started.
            var throttler = new SemaphoreSlim(initialCount: 20);
            foreach (var i in listOfData)
            {
                await throttler.WaitAsync();
                worker.Add(Task.Run(async () =>
                {
                    await ProcessSingle(x);
                    throttler.Release();
                }
                ));
            }
            await Task.WhenAll(worker);
        });
    
        // return created id immediately
        return id;
    }
    

    Read more How to limit the amount of concurrent async I/O operations?.

    Also, do not use Task.Factory.StartNew() when simple Task.Run() is enough to do work you want, read this excellent article by Stephen Cleary.