Search code examples
.nettask-parallel-libraryparallel.foreachtaskscheduler

How to dynamically limit concurrent execution of tasks created by Parallel.Foreach


I'm creating an application which executes commands against a set of resources. A resource can be a server for example. A task could be "ping" or "defragment database index" for example (these are examples as i cannot reveal the true nature of the application). The application executes this commands concurrently using TPL (Parallel.ForEach).

Now i've come to a point where i have to limit concurrent execution of commands against a resource stored on a common resource. As an example, here's a list of DB indexes which have to be defragmented.

Task# | Server | DB  | Index Name
---------------------------------
T1    | 1      | DB1 | A
T2    | 1      | DB1 | B
T3    | 1      | DB1 | C
T4    | 1      | DB2 | D
T5    | 2      | DB3 | E
T6    | 2      | DB3 | F
T7    | 3      | DB4 | G
T8    | 4      | DB5 | H
T9    | 6      | DB6 | I

To prevent massive parallel disk I/O on the servers, i have to limit the execution of the index defragmentation tasks to one per server. That means that the tasks T7, T8 & T9 can run at the same time. Only ONE of the tasks T1-T4 must run at the same time. Same for the tasks T5 & T6. It should also be possible to limit the concurrent execution to 2 tasks per resource for example. How can i achieve this?

I've had a look at the TaskScheduler class but it seems that a TaskScheduler cannot reject tasks (QueueTask method). Also the a custom Partitioner wouldn't get me there.

The only solution i can think of is to create my own implementation of Parallel.ForEach which can entirely take care of the limitation.

Has anyone a better idea?

As mentioned above, this is an example use case. Therefore, please do not post answers like: Create a SQL Agent job for this.


Solution

  • The example isn't the most appropriate. Trying to defrag multiple indexes concurrently will result in worse performance as the defrag operations will compete for the same CPU and disk resources. The only way for concurrent defragmentation operations to run faster would be for each index to be stored in a different disk (array).

    You might as well write all defrag statements in a single script and run it.

    In the general case where you want to queue some jobs/operations for concurrent execution with limited parallelism, the appropriate library is TPL Dataflow and the ActionBlock class. An ActionBlock allows you to post messages to a block for concurrent execution. By default, only one message is processed at a time. You can simply pass a different DOP parameter when creating the block to process more messages.

    var dopOptions=new ExecutionDataflowBlockOptions
                   {
                       MaxDegreeOfParallelism = maxDop
                   }
    var defragBlock=new ActionBlock<string)(indexName=>MyDefragMethod(indexName), dopOptions);
    
    //Post all the indexes. Only maxDop will be processed at a time
    foreach(var indexName in indexesList)
    {
        defragBlock.Post(indexName);
    }
    
    //Notify the block that we are done
    defragBlock.Complete();
    
    //and wait for all remaining indexes to finish
    await defragBlock.Completion;
    

    Parallel methods aren't suitable for job processing scenarios. They are used for data parallelism scenarios. They are meant to process large amounts of data, by partitioning the data and processing it with one task per partition. The number of partitions is roughly the same as the number of cores.

    The Dataflow library allows you to use more advanced scenarios. For example, what if you target multiple servers? In this case you could run defrag operations on each server concurrently. You can do that by creating a different block per server, passing a different connection string, eg

    var block1=new ActionBlock<string)(indexName=>MyDefragMethod(indexName,connStr1), dopOptions);
    var block2 =new ActionBlock<string)(indexName=>MyDefragMethod(indexName,connStr2), dopOptions);
    
    foreach(var indexName in indexesList1)
    {
        block1.Post(indexName);
    }
    foreach(var indexName in indexesList2)
    {
        block2.Post(indexName);
    }
    
    //Notify the block that we are done
    block1.Complete();
    block2.Complete();
    
    //and wait for all remaining indexes to finish
    await Task.WhenAll(block1.Completion,block2.Completion);
    

    You could also store the blocks in a dictionary keyed by server name, so that you can select the correct block inside the loop.