I look for an alternative to JoinBlock which can be linked to by n-TransformBlocks and join/merge messages of all TransformBlock source blocks together in order to pass a collection of such on to another data flow block.
JoinBlock does the job fine but it is limited to hooking up to 3 source blocks. It also suffers from quite a number inefficiencies (very slow to join even value types (ints) of 2 source blocks). Is there a way to have Tasks returned from the TransformBlocks and wait until all TransformBlocks have a completed task to pass on before accepting the Task<item>
Any alternative ideas? I potentially have 1-20 such transform blocks which items I need to join together before passing on the joined item collection. Each transform block is guaranteed to return exactly one output item for each input item "transformed".
Edit: Requested clarification:
Per one of my previous questions, I set up my JoinBlocks as follows:
public Test()
broadCastBlock = new BroadcastBlock<int>(i =>
return i;
transformBlock1 = new TransformBlock<int, int>(i =>
return i;
transformBlock2 = new TransformBlock<int, int>(i =>
return i;
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
public void Start()
Stopwatch watch = new Stopwatch();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
////mark completion
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
One way to do this is to use BatchBlock
with Greedy
set to false
. In this configuration, the block doesn't do anything until there are n
items from n
different blocks waiting for it to be consumed (where n
is the number you set when creating the BatchBlock
). When that happens, it consumes all n
items at once and produces an array containing all of the items.
One caveat with this solution is that the resulting array is not sorted: you're not going to know which item came from which source. And I have no idea how does its performance compare with JoinBlock
, you'll have to test that by yourself. (Though I would understand if using BatchBlock
this way was slower, because of the overhead necessary for non-greedy consumption.)