Search code examples
c#tpl-dataflow

How to complete a Block after JoinBlock


I have below senario to perform the data flow with TPL. a and b are some datasource joined together, then pass through the data flow.

var a = new TransformBlock<object, int>(_ => 1);
var b = new TransformBlock<object, int>(_ => 2);

var join = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });

var transform = new TransformBlock<Tuple<int, int>, int>(uri =>
{
    Console.WriteLine("Handling '{0}'...", uri);
    return uri.Item1;
});

var printReversedWords = new ActionBlock<int>(ddd => Console.WriteLine(ddd));

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

a.LinkTo(join.Target1);
b.LinkTo(join.Target2);
join.LinkTo(transform);
transform.LinkTo(printReversedWords, linkOptions);

a.Post(1);
b.Post(1);
Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
printReversedWords.Completion.Wait();

After all of this, I can see the log in the console like:

Handling '<1, 2>'...

1

That means the ActionBlock printReversedWords has been processed. However, it is still waiting at the last line, never ends.

Could anyone tell me what should I update on it?


Solution

  • To complete the pipeline you need to call Complete on the first block in the chain. In your case you'll need to make sure Complete() is called on both blocks a and b. And you'll need to propagate completion from the JoinBlock

    Also, instead of a blocking call to .Wait() it's usually better to make the method async and await completion.

    When you configure the the continuation make sure to handle exceptions as well as sucessfull completion by faulting the downstream blocks, An example is here.

    //Propagate Completion    
    join.LinkTo(transform, linkOptions); 
    transform.LinkTo(printReversedWords, linkOptions);
    
    a.Post(1);
    b.Post(1);
    
    //Complete the start of the pipeline
    a.Complete();
    b.Complete();
    
    await Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
    await printReversedWords.Completion;