Search code examples
c#parallel-processingtask-parallel-libraryparallel.foreach

Parallel - Adding items to the collection being iterated over, or equivalent?


Right now, I've got a C# program that performs the following steps on a recurring basis:

  • Grab current list of tasks from the database
  • Using Parallel.ForEach(), do work for each task

However, some of these tasks are very long-running. This delays the processing of other pending tasks because we only look for new ones at the start of the program.

Now, I know that modifying the collection being iterated over isn't possible (right?), but is there some equivalent functionality in the C# Parallel framework that would allow me to add work to the list while also processing items in the list?


Solution

  • Generally speaking, you're right that modifying a collection while iterating it is not allowed. But there are other approaches you could be using:

    • Use ActionBlock<T> from TPL Dataflow. The code could look something like:

      var actionBlock = new ActionBlock<MyTask>(
          task => DoWorkForTask(task),
          new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
      
      while (true)
      {
          var tasks = GrabCurrentListOfTasks();
          foreach (var task in tasks)
          {
              actionBlock.Post(task);
      
              await Task.Delay(someShortDelay);
              // or use Thread.Sleep() if you don't want to use async
          }
      }
      
    • Use BlockingCollection<T>, which can be modified while consuming items from it, along with GetConsumingParititioner() from ParallelExtensionsExtras to make it work with Parallel.ForEach():

      var collection = new BlockingCollection<MyTask>();
      
      Task.Run(async () =>
      {
          while (true)
          {
              var tasks = GrabCurrentListOfTasks();
              foreach (var task in tasks)
              {
                  collection.Add(task);
      
                  await Task.Delay(someShortDelay);
              }
          }
      });
      
      Parallel.ForEach(collection.GetConsumingPartitioner(), task => DoWorkForTask(task));