Search code examples
c#task-parallel-librarytpl-dataflow

How do I signal completion of my dataflow?


I've got a class the implements a dataflow composed of 3 steps using TPL Dataflow.

In the constructor I create the steps as TransformBlocks and link them up using LinkTo with DataflowLinkOptions.PropagateCompletion set to true. The class exposes a single method which kicks of the workflow by calling SendAsync on the 1st step. The method returns the "Completion" property of the final step of the workflow.

At the moment the steps in the workflow appear to execute as expected but final step never completes unless I explicitly call Complete on it. But doing that short-circuits the workflow and none of the steps are executed? What am I doing wrong?

public class MessagePipeline {
   private TransformBlock<object, object> step1;
   private TransformBlock<object, object> step2;
   private TransformBlock<object, object> step3;

   public MessagePipeline() {
      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
      step1 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step1...");
        return x;
      });
      step2 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step2...");
        return x;
      });
      step3 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step3...");
        return x;
      });

      step1.LinkTo(step2, linkOptions);
      step2.LinkTo(step3, linkOptions);
   }

   public Task Push(object message) {
      step1.SendAsync(message);
      step1.Complete();
      return step3.Completion;
   }
}
...
public class Program {
  public static void Main(string[] args) {
    var pipeline = new MessagePipeline();
    var result = pipeline.Push("Hello, world!");
    result.ContinueWith(_ => Console.WriteLine("Completed"));
    Console.ReadLine();
  }
}

Solution

  • When you link the steps, you need to pass a DataflowLinkOptions with the the PropagateCompletion property set to true to propagate both completion and errors. Once you do that, calling Complete() on the first block will propagete completion to downstream blocks.

    Once a block receives the completion event, it finishes processing then notifies its linked downstream targets.

    This way you can post all your data to the first step and call Complete(). The final block will only complete when all upstream blocks have completed.

    For example,

    var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};
    myFirstBlock.LinkTo(mySecondBlock,linkOptions);
    mySecondBlock.LinkTo(myFinalBlock,linkOptions);
    
    foreach(var message in messages)
    {
        myFirstBlock.Post(message);
    }
    myFirstBlock.Complete();
    ......
    await myFinalBlock.Completion;
    

    PropagateCompletion isn't true by default because in more complex scenarios (eg non-linear flows, or dynamically changing flows) you don't want completion and errors to propagate automatically. You may also want to avoid automatic completion if you want to handle errors without terminating the entire flow.

    Way back when TPL Dataflow was in beta the default was true but this was changed on RTM

    UPDATE

    The code never completes because the final step is a TransformBlock with no linked target to receive its output. This means that even though the block received the completion signal, it hasn't finished all its work and can't change its own Completion status.

    Changing it to an ActionBlock<object> removes the issue.