.Net TPL experts,
Note: Cannot use DataFlow library; no add-ons allowed.
I have four tasks as shown in the diagram below:
task_1 (data_producer) -> reads records from a large file (>500000 records) and adds records to a BlockingCollection
task_2, task_3 (data_consumers) -> Each of these tasks take records from the BlockingCollection. Each task performs some work on the record taken from the BlockingCollection (network related) and when complete, each task can add a record to the results queue. Order of processing is NOT important.
task_4 (results processor) -> Takes records from results_queue and writes to an output file.
I then wait for the tasks to complete, i.e.:
Task.WhenAll( t1, t2, t3, t4 )
So I have one producer task, MULTIPLE consumer tasks, and one task for saving the results.
My question is:
HOW do I notify task 4 when tasks 2 and 3 are completed, so that task 4 also knows when to end?
I have found many examples that "move" data from ONE task to another in a linear "pipeline" fashion, but have not found any examples that illustrate the above; that is, how to notify task 4 when task 2 and 3 are complete, so that it will know when to complete as well.
My initial thought is to "register" task 2 and 3 with task 4 and simply monitor the state of each registered task -- when task 2 and 3 are no longer running, then task 4 can stop (if the results queue is also empty).
Thanks in advance.
This is a bit of a extension on what Thomas already said.
By using a BlockingCollection
you can call GetConsumingEnumerable()
on it and just treat it as a normal foreach loop. This will let your tasks end "natually". The only thing you need to do is add one extra task that watches tasks 2 and 3 to see when they end and call the complete adding on them.
private BlockingCollection<Stage1> _stageOneBlockingCollection = new BlockingCollection<Stage1>();
private BlockingCollection<Stage2> _stageTwoBlockingCollection = new BlockingCollection<Stage2>();
Task RunProcess()
{
Task1Start();
var t2 = Stage2Start();
var t3 = Stage2Start();
Stage2MonitorStart(t2,t3);
retrun Task4Start();
}
public void Task1Start()
{
Task.Run(()=>
{
foreach(var item in GetFileSource())
{
var processedItem = Process(item);
_stageOneBlockingCollection.Add(processedItem);
}
_stageOneBlockingCollection.CompleteAdding();
}
}
public Task Stage2Start()
{
return Task.Run(()=>
{
foreach(var item in _stageOneBlockingCollection.GetConsumingEnumerable())
{
var processedItem = ProcessStage2(item);
_stageTwoBlockingCollection.Add(processedItem);
}
}
}
void Stage2MonitorStart(params Task[] tasks)
{
//Once all tasks complete mark the collection complete adding.
Task.WhenAll(tasks).ContinueWith(t=>_stageTwoBlockingCollection.CompleteAdding());
}
public Task Stage4Start()
{
return Task.Run(()=>
{
foreach(var item in _stageTwoBlockingCollection.GetConsumingEnumerable())
{
var processedItem = ProcessStage4(item);
WriteToOutputFile(processedItem);
}
}
}