Search code examples
c#concurrencytpl-dataflow

TPL Dataflow block that modifies state and sends a single message after it completes


I just started to learn about TPL Dataflow and have a question as described below: enter image description here

  1. "Block"s 1, 2, 3 holds references to states. They modify the states and send messages downstream each time they receive a message. The number of such blocks varies.

  2. The "Aggregator" receives messages from the Blocks and check all the messages for errors. After all source blocks are Completed and aggregator passes a single message to the "Releaser".

  3. "Releaser" holds a reference to the state. It will know from "Aggregator" whether the updating is done correctly and will send the state with a success message or a failure message downstream.

        public static void Run()
    {
        var sourceBlock1 = new TransformBlock<int, int>(x => x * 2);
        var sourceBlock2 = new TransformBlock<int, int>(x => x * 3);
    
        //How to implement the aggregator that aggregates messages from an unknown number of sources and then return a message
        //when all sources are complete?
        var aggregater = new TransformBlock<int, int[]>(x => ?); 
        var releaser = new TransformBlock<int[], int>(xs => xs.Sum());
    
        sourceBlock1.LinkTo(aggregater);
        sourceBlock2.LinkTo(aggregater);
        aggregater.LinkTo(releaser);
    
        sourceBlock1.Post(10);
        sourceBlock2.Post(20);
    
        targetBlock.Completion.Wait();
    }
    

Solution

  • In this line:

    sourceBlock1.LinkTo(aggregater);
    

    ...the aggregater receives no notification that it has become the linked target of the sourceBlock1. The ISourceBlock<TOutput>.LinkTo method changes only the state of the source, not the target. The target will only become aware that it has been linked when it is offered the first message, via the ITargetBlock<TInput>.OfferMessage method:

    public DataflowMessageStatus OfferMessage (
        DataflowMessageHeader messageHeader,
        TInput messageValue,
        ISourceBlock<TInput> source,
        bool consumeToAccept);
    

    Even then, it's not guaranteed that the source argument will be a reference to the sourceBlock1, since the sourceBlock1 can opt to intercept an internal proxy ISourceBlock<TInput> implementation between itself and its linked target. So I don't think that you can achieve what you want using solely the methods of the existing interfaces. Maybe you could equip your custom aggregator block with an API that allows for bidirectional link-awareness. For example:

    aggregater.Observe(sourceBlock1);
    aggregater.Observe(sourceBlock2);
    

    As for how to propagate the completion of multiple blocks to a single target block, take a look at these links: