I have a TPL Dataflow pipeline where a target block is linked to two propagating blocks, which are then both linked to a source block. All are linked with PropagateCompletion = true
. The first propagating block is linked with a filter accepting only even numbers, where the second accepts all the remaining messages.
After posting the last message, I set the first block to completed. There seems to be a race condition though. The final block seems to sometimes handle all of the values, but sometimes only the values that were accepted by the first propagating block and only a part of the values that were accepted by the second propagating block.
I feel there's a race condition. But I have no clue how to properly instruct the final source block that everything is completed, only after both of the propagating blocks that are linked to it, forwarded all of their messages.
Here's my code stripped down to a simple example:
internal static class Program
{
public static async Task Main(string[] args)
{
var linkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var bufferBlock = new BufferBlock<int>();
var fork1 = new TransformBlock<int, int>(n => n);
var fork2 = new TransformBlock<int, int>(n =>
{
Thread.Sleep(100);
return n;
});
var printBlock = new ActionBlock<int>(Console.WriteLine);
bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
fork1.LinkTo(printBlock, linkOptions);
fork2.LinkTo(printBlock, linkOptions);
for (var n = 1; n <= 10; ++n)
{
bufferBlock.Post(n);
}
bufferBlock.Complete();
await printBlock.Completion;
}
}
This outputs:
2
4
6
8
10
And I want it to output:
2
4
6
8
10
1
3
5
7
9
There is a diamond in your dataflow graph that causes the completion to propagate faster by either of two branches making the final block complete prematurely.
The completion of the last block can be customized using a task continuation:
...
var printBlock = new ActionBlock<int>(Console.WriteLine);
bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
fork1.LinkTo(printBlock); // no completion propagation
fork2.LinkTo(printBlock);
Task.WhenAll(fork1.Completion, fork2.Completion)
.ContinueWith(t => printBlock.Complete(),
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
for (var n = 1; n <= 10; ++n)
{
bufferBlock.Post(n);
}
bufferBlock.Complete();
await printBlock.Completion;