Search code examples
.nettask-parallel-librarytpl-dataflow

Notify task when other tasks complete


.Net TPL experts,

Note: Cannot use DataFlow library; no add-ons allowed.

I have four tasks as shown in the diagram below:

enter image description here

  • task_1 (data_producer) -> reads records from a large file (>500000 records) and adds records to a BlockingCollection

  • task_2, task_3 (data_consumers) -> Each of these tasks take records from the BlockingCollection. Each task performs some work on the record taken from the BlockingCollection (network related) and when complete, each task can add a record to the results queue. Order of processing is NOT important.

  • task_4 (results processor) -> Takes records from results_queue and writes to an output file.

I then wait for the tasks to complete, i.e.:

Task.WhenAll( t1, t2, t3, t4 )

So I have one producer task, MULTIPLE consumer tasks, and one task for saving the results.

My question is:

HOW do I notify task 4 when tasks 2 and 3 are completed, so that task 4 also knows when to end?

I have found many examples that "move" data from ONE task to another in a linear "pipeline" fashion, but have not found any examples that illustrate the above; that is, how to notify task 4 when task 2 and 3 are complete, so that it will know when to complete as well.

My initial thought is to "register" task 2 and 3 with task 4 and simply monitor the state of each registered task -- when task 2 and 3 are no longer running, then task 4 can stop (if the results queue is also empty).

Thanks in advance.


Solution

  • This is a bit of a extension on what Thomas already said.

    By using a BlockingCollection you can call GetConsumingEnumerable() on it and just treat it as a normal foreach loop. This will let your tasks end "natually". The only thing you need to do is add one extra task that watches tasks 2 and 3 to see when they end and call the complete adding on them.

    private BlockingCollection<Stage1> _stageOneBlockingCollection = new BlockingCollection<Stage1>();
    private BlockingCollection<Stage2> _stageTwoBlockingCollection = new BlockingCollection<Stage2>();
    
    Task RunProcess()
    {
        Task1Start();
        var t2 = Stage2Start();
        var t3 = Stage2Start();
        Stage2MonitorStart(t2,t3);
        retrun Task4Start();
    }
    
    public void Task1Start()
    {
        Task.Run(()=>
        {
            foreach(var item in GetFileSource())
            {
                var processedItem = Process(item);
                _stageOneBlockingCollection.Add(processedItem);
            }
            _stageOneBlockingCollection.CompleteAdding();
        }
    }
    
    public Task Stage2Start()
    {
        return Task.Run(()=>
        {
            foreach(var item in _stageOneBlockingCollection.GetConsumingEnumerable())
            {
                var processedItem = ProcessStage2(item);
                _stageTwoBlockingCollection.Add(processedItem);
            }
        }
    }
    
    void Stage2MonitorStart(params Task[] tasks)
    {
        //Once all tasks complete mark the collection complete adding.
        Task.WhenAll(tasks).ContinueWith(t=>_stageTwoBlockingCollection.CompleteAdding());
    }
    
    public Task Stage4Start()
    {
        return Task.Run(()=>
        {
            foreach(var item in _stageTwoBlockingCollection.GetConsumingEnumerable())
            {
                var processedItem = ProcessStage4(item);
                WriteToOutputFile(processedItem);
            }
        }
    }