Search code examples
c#task-parallel-librarytpl-dataflow

ActionBlock never completes on Faulted State


I am trying to make a simple program to better understand TPL Dataflow. I am trying to create a long running task that will restart a dataflow block if it has completed.

My current RestartActionBlock Task is able to await completion of the ActionBlock if I explicitly call complete on the block by entering "-1". But when I try to raise an exception to fault the block or call the blocks Fault() interface method, the ActionBlock's Completion Task never completes. In this situation the await singleTestFlow.Completion; call never continues.

After raising the Exception or calling the Fault() method, I can be sure the block is in the faulted state by trying to enter another input into the program and debugging the code to see that the block is in the faulted state:

ActionBlock in the faulted state

If the block is in the faulted state, why did the await singleTestFlow.Completion; never return?

class Program
{
    private static ActionBlock<string> singleTestFlow;

    static void Main(string[] args)
    {
        //start thread that should restart a completed action block
        Task.Run(RestartActionBlock);

        Console.WriteLine("Enter -0 to exit, -1 to complete flow, -0- to throw an exception, Anything else otherwise");
        var input = Console.ReadLine();

        //allow user to input text until "-0" is entered
        while (!input.Equals("-0"))
        {
            if (input.Equals("-1"))
            {
                singleTestFlow.Complete();
            }

            singleTestFlow.Post(input);

            input = Console.ReadLine();
        }

        async Task RestartActionBlock()
        {
            var iterations = 0;
            while (true)
            {
                singleTestFlow = new ActionBlock<string>(s =>
                {
                    if (s.Equals("-0-"))
                    {
                        //throw new Exception("Something went wrong in here");
                        ((IDataflowBlock)singleTestFlow).Fault(new Exception("Something went wrong in here"));
                    }
                    Console.WriteLine($"{iterations}: " + s);

                }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount });

                await singleTestFlow.Completion;

                var completionTask = singleTestFlow.Completion;

                var message = $"action block: {iterations} ";
                switch (completionTask.Status)
                {
                    case TaskStatus.RanToCompletion:
                        message += "ran to completion";
                        break;
                    case TaskStatus.Canceled:
                        message += "was canceled";
                        break;
                    case TaskStatus.Faulted:
                        message += "has faulted";
                        Console.WriteLine(completionTask.Exception);
                        break;
                }
                Console.WriteLine(message);
                iterations++;
            }
        }
    }


}

Sample Console Entry The point in the console where i entered "eee" is where the faulted block debug screenshot was taken.


Solution

  • Here's the simple fix:

    Replace

    await singleTestFlow.Completion; // Throws if Completion is faulted, breaking your loop.
    

    with

    await Task.WhenAny(singleTestFlow.Completion);
    

    or

    await singleTestFlow.Completion.ContinueWith(_ => { });
    

    What the above does is avoid propagating the Completion task exception. This allows your RestartActionBlock loop to execute forever as you intended (as opposed to dying right after your block faults).

    Ideally, however, you should not be disregarding the return values of the following two statements:

    Task.Run(RestartActionBlock); // Unobserved Task. Usually a bad idea.
    

    and

    singleTestFlow.Post(input);
    

    If you were observing the return values of the above, you'd actually notice that they start screaming for help as soon as your ActionBlock<string> faults.

    The task returned by Task.Run(RestartActionBlock) transitions to Faulted state as soon as await singleTestFlow.Completion throws - but you never get notified, because you don't keep a reference to said task, or check its status.

    Similarly, right after your ActionBlock<string> faults, subsequent calls to singleTestFlow.Post(input) actually return false, meaning that no more items get posted to the block. They are simply discarded.