I am working on a project where a need arose that is a perfect scenario for TPL Dataflow. Having relatively limited experience with it (and what I do have was done some time ago), I have been brushing up on it by reading Microsoft's documentation as well as articles I can find online.
Having done that, I built my code to chain together a series of blocks (mostly TransformBlock
and ending with an ActionBlock
doing something like this:
var block1 = new TransformBlock<T, U>(async input => {});
var block2 = new TransformBlock<U, V>(async input => {});
var block3 = new ActionBlock<V>(async input => {});
block1.LinkTo(block2);
block2.LinkTo(block3);
foreach(var item in items)
{
await block1.SendAsync(item);
}
block1.Complete();
await block3.Completion;
Someone in an article (which I can't find) suggested that there should be continuation tasks in the pipeline to mark blocks as complete. This is the code they provided for that.
// Create the continuation tasks in the pipeline that marks each block as complete.
await block1.Completion.ContinueWith(t =>
{
if (t.IsFaulted) { ((IDataflowBlock)block2).Fault(t.Exception); }
else { block2.Complete(); }
});
await block2.Completion.ContinueWith(t =>
{
if (t.IsFaulted) { ((IDataflowBlock)block3).Fault(t.Exception); }
else { block3.Complete(); }
});
I will admit that I don't fully understand what this code is doing and whether it is even needed. When I attempt to run this in the code I just wrote, the code hangs on the first ContinueWith
and it never makes it to running the pipeline.
I would appreciate additional explanation since I want to get a better understanding of the nuances of what is going on here.
All you need to do for a linear pipeline is to PropagateCompletion
. That option propagates Completion
as well as Faults
whose exception is then attached to the final Completion Task
:
var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
block1.LinkTo(block2, linkOptions);
block2.LinkTo(block3, linkOptions);
The continuations are unnecessary. But if you have pipeline that distributes to multiple blocks you'll need to handle completion and fault propagation yourself as shown here.
the code hangs on the first
ContinueWith
That happens because you await
the continuation, so if you do that before calling Complete()
then block1
will never complete.