Search code examples
c#.netasync-awaittask-parallel-librarytpl-dataflow

TransformBlock Items get stuck in the output queue. Why and how to fix?


I have walked through the TPL DataFlow and have faced a very irritating problem occurred in the code using TrasformBlock linked to ActionBlock.

Eventually I have found that items got stuck in TransformBlock's output queue, since its OutputCount property continuously returned the value higher than "0". That's why the whole application deadlocked. However, it unblocks as soon as I call TransformBlock.TryReceiveAll().

Can anyone, please, let me know if there is something I missed or how to prevent such behavior?

static void Main()
{
    int total = 0;
    int itemsProcessing = 0;

    TransformBlock<int, Tuple<int, double>> transformBlock = new TransformBlock<int, Tuple<int, double>>(
        i => new Tuple<int, double>(i, Math.Sqrt(i)),
        new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = 20,
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    ActionBlock<Tuple<int, double>> outputBlock = new ActionBlock<Tuple<int, double>>(async tuple =>
        {
            await Task.Delay(1000); // simulating data output delay
            Interlocked.Decrement(ref itemsProcessing);
        },
        new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = 5,
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    transformBlock.Completion.ContinueWith(t => outputBlock.Complete());

    using (Timer timer = new Timer(o =>
        {
            Console.Title = string.Format(
                "{0}: {1}/{2} {3}/{4}/{5}",
                Assembly.GetExecutingAssembly().GetName().Name,
                Volatile.Read(ref itemsProcessing), Volatile.Read(ref total),
                transformBlock.InputCount, transformBlock.OutputCount, outputBlock.InputCount);
        }, null, 100, 100))
    {
        using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
        {
            for (int i = 0; i < 40; i++)
            {
                Thread.Sleep(100); // simulating new item retrieval delay

                Interlocked.Increment(ref total);
                Interlocked.Increment(ref itemsProcessing);

                transformBlock.SendAsync(i).Wait();
            }
        }

        Console.WriteLine("Enqueued");

        transformBlock.Complete();
        outputBlock.Completion.Wait();

        Console.WriteLine("Finish");

        timer.Change(Timeout.Infinite, Timeout.Infinite);
        timer.Dispose();
    }
}

Solution

  • Invoking TransformBlock.LinkTo gets you back a disposable registration. When you dispose of that registration the blocks unlink.

    Your using scope ends too soon and the blocks unlink before the TransformBlock has a chance to empty itself into the ActionBlock preventing it from being able to complete. Since the first block doesn't complete the next one doesn't even start completing, let alone finish.

    Moving that wait inside the using block solves the deadlock:

    using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    {
        for (int i = 0; i < 40; i++)
        {
            Thread.Sleep(100); // simulating new item retrieval delay
    
            Interlocked.Increment(ref total);
            Interlocked.Increment(ref itemsProcessing);
    
            transformBlock.SendAsync(i).Wait();
        }
    
        Console.WriteLine("Enqueued");
        transformBlock.Complete();
        outputBlock.Completion.Wait();
        Console.WriteLine("Finish");
    }
    

    As a side note, you really shouldn't be blocking on async code in such a way. It would be much simpler to use async-await instead of Wait(), Task.Delay instead of Thread.Sleep, etc.

    Also, since you're using PropagateCompletion you don't need to call outputBlock.Complete() explicitly.