Search code examples
c#asynchronousconcurrencytasktask-parallel-library

Execute a limited number of Tasks concurrent


Given I have an IEnumerable<Func<Task>> to generate some (crawler)-Tasks which I want to execute concurrent, how can I set an upper bound for the concurrency?

E.g., I do not want that more than 5 of these Tasks are running at the same time. On the other side, there always should be 5 Tasks running if possible.

My current approach is this code:

    public static async Task ExecuteConcurrent(IEnumerable<Func<Task>> taskGenerators, int maxDegreeOfConcurrency)
    {
        var executingTasks = new HashSet<Task>();
        foreach (var taskGenerator in taskGenerators) {
            while (executingTasks.Count >= maxDegreeOfConcurrency) {
                executingTasks.Remove(await Task.WhenAny(executingTasks));
            }

            executingTasks.Add(taskGenerator());
        }

        await Task.WhenAll(executingTasks);
    }

I am wondering if there is a better method to do this? Maybe, there is already a method available?

Thank you


Solution

  • Sounds like a job for TPL Dataflow

    The advantages are :

    1. It work well with async and await and both CPU bound and IO bound workloads
    2. You can limit concurrency with MaxDegreeOfParallelism and many other options
    3. You can Chain it into more complicated PipeLines
    4. Works well with Reactive Extensions
    5. its all built for you by Mr Parallel Stephen Toub

    Nuget System.Threading.Tasks.Dataflow


    Very basic example

    public static async Task DoWorkLoads(List<IPAddress> addresses)
    {
       var options = new ExecutionDataflowBlockOptions
                         {
                            MaxDegreeOfParallelism = 50 // limit here
                         };
    
       var block = new ActionBlock<SomeObject>(MyMethodAsync, options);
    
       foreach (var ip in addresses)
          block.Post(ip);
    
       block.Complete();
       await block.Completion;
    
    }
    
    ...
    
    public async Task MyMethodAsync(SomeObject obj)
    {
        // await something here
    }