I have below senario to perform the data flow with TPL. a
and b
are some datasource joined together, then pass through the data flow.
var a = new TransformBlock<object, int>(_ => 1);
var b = new TransformBlock<object, int>(_ => 2);
var join = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });
var transform = new TransformBlock<Tuple<int, int>, int>(uri =>
{
Console.WriteLine("Handling '{0}'...", uri);
return uri.Item1;
});
var printReversedWords = new ActionBlock<int>(ddd => Console.WriteLine(ddd));
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
a.LinkTo(join.Target1);
b.LinkTo(join.Target2);
join.LinkTo(transform);
transform.LinkTo(printReversedWords, linkOptions);
a.Post(1);
b.Post(1);
Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
printReversedWords.Completion.Wait();
After all of this, I can see the log in the console like:
Handling '<1, 2>'...
1
That means the ActionBlock printReversedWords
has been processed. However, it is still waiting at the last line, never ends.
Could anyone tell me what should I update on it?
To complete the pipeline you need to call Complete
on the first block in the chain. In your case you'll need to make sure Complete()
is called on both blocks a
and b
. And you'll need to propagate completion from the JoinBlock
Also, instead of a blocking call to .Wait()
it's usually better to make the method async
and await
completion.
When you configure the the continuation make sure to handle exceptions as well as sucessfull completion by faulting the downstream blocks, An example is here.
//Propagate Completion
join.LinkTo(transform, linkOptions);
transform.LinkTo(printReversedWords, linkOptions);
a.Post(1);
b.Post(1);
//Complete the start of the pipeline
a.Complete();
b.Complete();
await Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
await printReversedWords.Completion;