Search code examples
c#c#-4.0task-parallel-library

Simple Parallel Tasks with Continuation


I have been reading for the 2 hours and I am still confused. Some say use StartNew, some say Task.Run some say something else. I do know that Task.Run is giving me a compile error.

I need to start multiple tasks in parallel and then when each completes successfully do a continuation task. Knowing when all are done with blocking would be helpful.

Here is what I have:

    public void DoSomeWork(object workItem)
    {
        var tasks = new Task<ResultArgs>[_itemList.Count];

        for (int loopCnt = 0; loopCnt < _itemList.Count; loopCnt++)
        {
            tasks[loopCnt] = new Task<ResultArgs>.Run(() =>
            {
                return _itemList[loopCnt].Analyze(workItem);
            });
            tasks[loopCnt].ContinueWith(ReportResults, TaskContinuationOptions.ExecuteSynchronously);
        }
    }

The compile says Run does not exist in Task.

Obviously, I have something run but I do not know what.

How do I get past this problem?


Solution

  • You can either do with async methods or you can flow your items into a dataflow the following code uses Tpl-dataflow to process your items, passes them to your second processing step and finally await completion of processing.

    using NUnit.Framework;
    using System;
    using System.Collections.Concurrent;
    using System.Linq;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    
    namespace AsyncProcessing {
    
        [TestFixture]
        public class PipelineTests {
    
            [Test]
            public async Task RunPipeline() {
                var pipeline = new MyPipeline();
                var data = Enumerable.Range(0, 1000).Select(x => new WorkItem(x, x));
    
                foreach(var item in data) {
                    await pipeline.SendAsync(item);
                }
    
                pipeline.Complete();
                await pipeline.Completion;
    
                //all processing complete            
            }
        }
    
        class MyPipeline {
    
            private BufferBlock<WorkItem> inputBuffer;
            private TransformBlock<WorkItem, WorkItem> analyzeBlock;
            private TransformBlock<WorkItem, ResultArg> reportBlock;
            private ActionBlock<ResultArg> postOutput;
    
            public ConcurrentBag<ResultArg> OutputBuffer { get; }
            public Task Completion { get { return postOutput.Completion; } }
    
            public MyPipeline() {
                OutputBuffer = new ConcurrentBag<ResultArg>();
                CreatePipeline();
                LinkPipeline();
            }
    
            public void Complete() {
                inputBuffer.Complete();
            }
    
            public async Task SendAsync(WorkItem data) {
                await inputBuffer.SendAsync(data);
            }
    
            public void CreatePipeline() {
                var options = new ExecutionDataflowBlockOptions() {
                    MaxDegreeOfParallelism = Environment.ProcessorCount,
                    BoundedCapacity = 10
                };
    
                inputBuffer = new BufferBlock<WorkItem>(options);
    
                analyzeBlock = new TransformBlock<WorkItem, WorkItem>(item => {
                    //Anylyze item....
                    return item;
                }, options);
    
                reportBlock = new TransformBlock<WorkItem, ResultArg>(item => {
                    //report your results, email.. db... etc.
                    return new ResultArg(item.JobId, item.WorkValue);
                }, options);
    
                postOutput = new ActionBlock<ResultArg>(item => {
                    OutputBuffer.Add(item);
                }, options);
            }
    
            public void LinkPipeline() {
                var options = new DataflowLinkOptions() {
                    PropagateCompletion = true,
                };
    
                inputBuffer.LinkTo(analyzeBlock, options);
                analyzeBlock.LinkTo(reportBlock, options);
                reportBlock.LinkTo(postOutput, options);
            }
        }
    
        public class WorkItem {
    
            public int JobId { get; set; }
            public int WorkValue { get; set; }
    
            public WorkItem(int id, int workValue) {
                this.JobId = id;
                this.WorkValue = workValue;
            }
        }
    
        public class ResultArg {
    
            public int JobId { get; set; }
            public int Result { get; set; }
    
            public ResultArg(int id, int result) {
                this.JobId = id;
                this.Result = result;
            }
        }
    }