I am trying to make a simple program to better understand TPL Dataflow. I am trying to create a long running task that will restart a dataflow block if it has completed.
My current RestartActionBlock
Task is able to await completion of the ActionBlock
if I explicitly call complete on the block by entering "-1". But when I try to raise an exception to fault the block or call the blocks Fault() interface method, the ActionBlock
's Completion Task never completes. In this situation the await singleTestFlow.Completion;
call never continues.
After raising the Exception or calling the Fault() method, I can be sure the block is in the faulted state by trying to enter another input into the program and debugging the code to see that the block is in the faulted state:
If the block is in the faulted state, why did the await singleTestFlow.Completion;
never return?
class Program
{
private static ActionBlock<string> singleTestFlow;
static void Main(string[] args)
{
//start thread that should restart a completed action block
Task.Run(RestartActionBlock);
Console.WriteLine("Enter -0 to exit, -1 to complete flow, -0- to throw an exception, Anything else otherwise");
var input = Console.ReadLine();
//allow user to input text until "-0" is entered
while (!input.Equals("-0"))
{
if (input.Equals("-1"))
{
singleTestFlow.Complete();
}
singleTestFlow.Post(input);
input = Console.ReadLine();
}
async Task RestartActionBlock()
{
var iterations = 0;
while (true)
{
singleTestFlow = new ActionBlock<string>(s =>
{
if (s.Equals("-0-"))
{
//throw new Exception("Something went wrong in here");
((IDataflowBlock)singleTestFlow).Fault(new Exception("Something went wrong in here"));
}
Console.WriteLine($"{iterations}: " + s);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount });
await singleTestFlow.Completion;
var completionTask = singleTestFlow.Completion;
var message = $"action block: {iterations} ";
switch (completionTask.Status)
{
case TaskStatus.RanToCompletion:
message += "ran to completion";
break;
case TaskStatus.Canceled:
message += "was canceled";
break;
case TaskStatus.Faulted:
message += "has faulted";
Console.WriteLine(completionTask.Exception);
break;
}
Console.WriteLine(message);
iterations++;
}
}
}
}
The point in the console where i entered "eee" is where the faulted block debug screenshot was taken.
Here's the simple fix:
Replace
await singleTestFlow.Completion; // Throws if Completion is faulted, breaking your loop.
with
await Task.WhenAny(singleTestFlow.Completion);
or
await singleTestFlow.Completion.ContinueWith(_ => { });
What the above does is avoid propagating the Completion
task exception. This allows your RestartActionBlock
loop to execute forever as you intended (as opposed to dying right after your block faults).
Ideally, however, you should not be disregarding the return values of the following two statements:
Task.Run(RestartActionBlock); // Unobserved Task. Usually a bad idea.
and
singleTestFlow.Post(input);
If you were observing the return values of the above, you'd actually notice that they start screaming for help as soon as your ActionBlock<string>
faults.
The task returned by Task.Run(RestartActionBlock)
transitions to Faulted
state as soon as await singleTestFlow.Completion
throws - but you never get notified, because you don't keep a reference to said task, or check its status.
Similarly, right after your ActionBlock<string>
faults, subsequent calls to singleTestFlow.Post(input)
actually return false
, meaning that no more items get posted to the block. They are simply discarded.