Search code examples
c#asynchronoustask-parallel-libraryasync-ctpasync-await

How to properly run multiple async tasks in parallel?


What if you need to run multiple asynchronous I/O tasks in parallel but need to make sure that no more than X I/O processes are running at the same time; and pre and post I/O processing tasks shouldn't have such limitation.

Here is a scenario - let's say there are 1000 tasks; each of them accepts a text string as an input parameter; transforms that text (pre I/O processing) then writes that transformed text into a file. The goal is to make pre-processing logic utilize 100% of CPU/Cores and I/O portion of the tasks run with max 10 degree of parallelism (max 10 simultaneously opened for writing files at a time).

Can you provide a sample code how to do it with C# / .NET 4.5?

http://blogs.msdn.com/b/csharpfaq/archive/2012/01/23/using-async-for-file-access-alan-berman.aspx


Solution

  • I think using TPL Dataflow for this would be a good idea: you create pre- and post-process blocks with unbounded parallelism, a file-writing block with limited parallelism and link them together. Something like:

    var unboundedParallelismOptions =
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        };
    
    var preProcessBlock = new TransformBlock<string, string>(
        s => PreProcess(s), unboundedParallelismOptions);
    
    var writeToFileBlock = new TransformBlock<string, string>(
        async s =>
                {
                    await WriteToFile(s);
                    return s;
                },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });
    
    var postProcessBlock = new ActionBlock<string>(
        s => PostProcess(s), unboundedParallelismOptions);
    
    var propagateCompletionOptions =
        new DataflowLinkOptions { PropagateCompletion = true };
    
    preProcessBlock.LinkTo(writeToFileBlock, propagateCompletionOptions);
    writeToFileBlock.LinkTo(postProcessBlock, propagateCompletionOptions);
    
    // use something like await preProcessBlock.SendAsync("text") here
    
    preProcessBlock.Complete();
    await postProcessBlock.Completion;
    

    Where WriteToFile() could look like this:

    private static async Task WriteToFile(string s)
    {
        using (var writer = new StreamWriter(GetFileName()))
            await writer.WriteAsync(s);
    }