Search code examples
c#.nettpl-dataflow

In a JoinBlock, receive a Target when the other Target is filled


I'm connecting a JoinBlock to a WriteOnceBlock and a BufferBlock to fill Targets 1 and 2. My goal is that every time the JoinBlock receives a message from the BufferBlock , it also requests the value that the WriteOnceBlock is holding.

My first guess was that I could add a ContinueWith delegate to the Target2 Completion event, but that's not quite right - I need to attach to something like a Filled event that doesn't seem to exist.

I also tried using the join block in non-greedy mode as a last-ditch effort, but that didn't change the output.

Am I missing something obvious here?

Example:

static void Main(string[] args)
    {
        var writeOnceBlockTest = new WriteOnceBlock<int>(i => i);
        var queueBlockTest = new BufferBlock<string>();
        var joinBlockTest = new JoinBlock<int, string>();
        var actionBlockTest = new ActionBlock<System.Tuple<int, string>>(tuple => Console.WriteLine($"I received int {tuple.Item1} and string {tuple.Item2}."));

        writeOnceBlockTest.LinkTo(joinBlockTest.Target1);
        queueBlockTest.LinkTo(joinBlockTest.Target2, new DataflowLinkOptions{PropagateCompletion = true});
        joinBlockTest.LinkTo(actionBlockTest, new DataflowLinkOptions { PropagateCompletion = true });

        writeOnceBlockTest.Post(3);
        queueBlockTest.Post("String1");
        queueBlockTest.Post("String2");
        writeOnceBlockTest.Post(4);
        writeOnceBlockTest.Post(5);
        queueBlockTest.Post("String3");
        queueBlockTest.Post("String4");
        queueBlockTest.Complete();
        Console.ReadLine();
    }

Desired output:

I received int 3 and string String1.

I received int 3 and string String2.

I received int 3 and string String3.

I received int 3 and string String4.

Actual output:

I received int 3 and string String1.


Solution

  • The JoinBlock isn't the right choice here although it does seem like the perfect fit. The WriteOnceBlock, as you've found, only offers it's value once. However, you can read that value many times. With this you can use a TransformBlock to get the behavior your looking for.

    public class JoinFlow
    {
        [Test]
        public async Task TestWriteOnceBlock()
        {
            var writeOnceBlockTest = new WriteOnceBlock<int>(i => i);
            var queueBlockTest = new BufferBlock<string>();
            var transformBlockTest = new TransformBlock<string, Tuple<int, string>>(async str => Tuple.Create(await writeOnceBlockTest.ReceiveAsync(), str));
            var actionBlockTest = new ActionBlock<Tuple<int, string>>(tuple => Console.WriteLine($"I received int {tuple.Item1} and string {tuple.Item2}."));
    
            queueBlockTest.LinkTo(transformBlockTest, new DataflowLinkOptions { PropagateCompletion = true });
            transformBlockTest.LinkTo(actionBlockTest, new DataflowLinkOptions { PropagateCompletion = true });
    
            writeOnceBlockTest.Post(3);
            queueBlockTest.Post("String1");
            queueBlockTest.Post("String2");
            writeOnceBlockTest.Post(4);
            writeOnceBlockTest.Post(5);
            queueBlockTest.Post("String3");
            queueBlockTest.Post("String4");
            queueBlockTest.Complete();
            await actionBlockTest.Completion;
        }
    }
    

    Outputs:

    I received int 3 and string String1.
    I received int 3 and string String2.
    I received int 3 and string String3.
    I received int 3 and string String4.