Search code examples
c#task-parallel-librarytpl-dataflow

TPL Dataflow create aggregated result array from all incoming nodes (multiple producer, 1 consumer)


Please note following code sample. I need an aggregator node, that can be linked to any number of sources, waites for all sources to send one message and then combines those in a result[].

This should be obvious and straigt forward, but somehow I do not find a solution. I checked JoinBlock and TransformaterBlock, but both seem unfitting.

using System;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp2
{
    internal class Program
    {
        private static readonly uint _produceCount = 0;
        private static void Main(string[] args)
        {

            BufferBlock<string> p1 = new BufferBlock<string>();
            BufferBlock<string> p2 = new BufferBlock<string>();

            // a block is required that accepts n sources as input, waits for all inputs to arrive, and then creates a result array from all inputs

            ActionBlock<string[]> c1 = new ActionBlock<string[]>((inputs) =>
            {
                Console.WriteLine(String.Join(',', inputs));
            });

            p1.Post("Produce 1.1");
            p2.Post("Produce 2.1");

            // desired output:
            // "Produce 1.1, Produce 2.1"
            // actually the order is of no importance at this time

        }


    }
}

[Edit] Further clarification: I would like to have a block that: - dynamically await all source-notes (at the point in time the first message arrives) to complete and aggregate the result to pass to follower nodes


Solution

  • You can use a non-greedy BatchBlock for this. By being non-greedy each source will contribute one item to the batch. This was originally suggested here. And here's a tested example: Note as proof, source1 is sent multiple items that don't show up in the batch:

    public class DataAggregator
    {
        private BatchBlock<string> batchBlock = new BatchBlock<string>(5, new GroupingDataflowBlockOptions() { Greedy = false });
        private ActionBlock<string[]> writer = new ActionBlock<string[]>(strings => strings.ToList().ForEach(str => Console.WriteLine(str)));
        private BufferBlock<string> source1 = new BufferBlock<string>();
        private BufferBlock<string> source2 = new BufferBlock<string>();
        private BufferBlock<string> source3 = new BufferBlock<string>();
        private BufferBlock<string> source4 = new BufferBlock<string>();
        private BufferBlock<string> source5 = new BufferBlock<string>();
    
        public DataAggregator()
        {
            source1.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
            source2.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
            source3.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
            source4.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
            source5.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
            batchBlock.LinkTo(writer, new DataflowLinkOptions() { PropagateCompletion = true });
        }
    
        [Test]
        public async Task TestPipeline()
        {
            source1.Post("string1-1");
            source1.Post("string1-2");
            source1.Post("string1-3");
            source2.Post("string2-1");
            source3.Post("string3-1");
            source4.Post("string4-1");
            source5.Post("string5-1");
            //Should print string1-1 string2-1 string3-1 string4-1 string5-1
            source1.Complete();
            source2.Complete();
            source3.Complete();
            source4.Complete();
            source5.Complete();
            await writer.Completion;
        }
    }
    

    Output:

    string1-1
    string2-1
    string3-1
    string4-1
    string5-1