Search code examples
c#tpl-dataflow

Why TPL Dataflow block.LinkTo does not give any output?


I am quite new to the topic TPL Dataflow. In the book Concurrency in C# I tested the following example. I can't figure out why there's no output which should be 2*2-2=2;

static void Main(string[] args)
{
    //Task tt = test();
    Task tt = test1(); 
    Console.ReadLine();
}
static async Task test1()
{
    try
    {
        var multiplyBlock = new TransformBlock<int, int>(item =>
        {
            if (item == 1)
                throw new InvalidOperationException("Blech.");
            return item * 2;
        });
        var subtractBlock = new TransformBlock<int, int>(item => item - 2);
        multiplyBlock.LinkTo(subtractBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
        multiplyBlock.Post(2);
        await subtractBlock.Completion;
        int temp = subtractBlock.Receive();
        Console.WriteLine(temp);
    }
    catch (AggregateException e)
    {
        // The exception is caught here.
        foreach (var v in e.InnerExceptions)
        {
            Console.WriteLine(v.Message);
        }

    }

}

Update1: I tried another example. Still I did not use Block.Complete() but I thought when the first block's completed, the result is passed into the second block automatically.

private static async Task test3()
{
    TransformManyBlock<int, int> tmb = new TransformManyBlock<int, int>((i) => { return new int[] {i, i + 1}; });
    ActionBlock<int> ab = new ActionBlock<int>((i) => Console.WriteLine(i));

    tmb.LinkTo(ab);

    for (int i = 0; i < 4; i++)
    {
        tmb.Post(i);
    }
    //tmb.Complete();
    await ab.Completion;
    Console.WriteLine("Finished post");
}

Solution

  • This part of the code:

    await subtractBlock.Completion;
    int temp = subtractBlock.Receive();
    

    is first (asynchronously) waiting for the subtraction block to complete, and then attempting to retrieve an output from the block.

    There are two problems: the source block is never completed, and the code is attempting to retrieve output from a completed block. Once a block has completed, it will not produce any more data.

    (I assume you're referring to the example in recipe 4.2, which will post 1, causing the exception, which completes the block in a faulted state).

    So, you can fix this test by completing the source block (and the completion will propagate along the link to the subtractBlock automatically), and by reading the output before (asynchronously) waiting for subtractBlock to complete:

    multiplyBlock.Complete();
    int temp = subtractBlock.Receive();
    await subtractBlock.Completion;