Search code examples
.netasync-awaittask-parallel-librarytpl-dataflow

.NET queued tasks (with async/await)


I have a large number of tasks (~1000) that need to be executed. I am running on a 4-core processor, so I'd like to process 4 tasks at a time, in parallel.

To give you a starting point, here is some sample code.

class Program
{
    public class LongOperation
    {
        private static readonly Random RandomNumberGenerator = new Random(0);
        const int UpdateFrequencyMilliseconds = 100;

        public int CurrentProgress { get; set; }

        public int TargetProcess { get; set; }

        public LongOperation()
        {
            TargetProcess = RandomNumberGenerator.Next(
                (int)TimeSpan.FromSeconds(5).TotalMilliseconds / UpdateFrequencyMilliseconds, 
                (int)TimeSpan.FromSeconds(10).TotalMilliseconds / UpdateFrequencyMilliseconds);
        }

        public async Task Execute()
        {
            while (!IsCompleted)
            {
                await Task.Delay(UpdateFrequencyMilliseconds);
                CurrentProgress++;
            }
        }

        public bool IsCompleted => CurrentProgress >= TargetProcess;
    }

    static void Main(string[] args)
    {
        Task.Factory.StartNew(async () =>
        {
            var operations = new List<LongOperation>();

            for(var x = 1; x <= 10; x++)
                operations.Add(new LongOperation());

            await ProcessOperations(4, operations);
        }).Wait();
    }

    public static async Task ProcessOperations(int maxSimultaneous, List<LongOperation> operations)
    {
        await Task.WhenAll(operations.Select(x => x.Execute()));
        // TODO: Process up to 4 operations at a time, until every operation is completed.
    }
}

I'd like some input on what classes I would use, and how I'd structure the ProcessOperations to process up to 4 operations at a time, until all operations are completed, in a single await-able Task.

I'm thinking of using a SemaphoreSlim object in some way, since it seems to be geared towards throttling a resource/process.


Solution

  • As already been suggested, you need to use a handy TPL Dataflow library, with two blocks, for storing the messages before processing, and for actual action on them:

    // storage
    var operations = new BufferBlock<LongOperation>();
    // no more than 4 actions at the time
    var actions = new ActionBlock<LongOperation>(x => x.Execute(),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
    
    // consume new operations automatically
    operations.LinkTo(actions);
    for(var x = 1; x <= 10; ++x)
    {
        // blocking sending
        operations.Post(new LongOperation());
        // awaitable send for async operations
        // await operations.SendAsync(new LongOperation());
    }
    

    Also you can intriduce some throttling limits, like no more than 30 operations at the time, by settings the BoundedCapacity option for your buffer.