Search code examples
c#.nettpl-dataflow

Transform Block with parallelism and bounded capacity postponing message behavior


When a TransformBlock has a MaxDegreeOfParallelism > 1 and BoundedCapacity that isn't unbounded, why does it postpone receiving further messages while there is one long running task despite there being capacity in the input queue?

Take the following console application. It creates a TransformBlock with a MaxDegreeOfParallelism = 5 and BoundedCapacity = 5 then feeds it 100 messages. When the block processes message x == 50, it delays that task for 10 seconds.

TransformBlock<int, string> DoSomething = new TransformBlock<int, string>(async (x) => {
    if (x == 50)
    {
        Console.WriteLine("x == 50 reached, delaying for 10 seconds.");
        await Task.Delay(10000);
    }
    Console.WriteLine($"processed message {x}");
    return x.ToString();
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 });

DoSomething.LinkTo(DataflowBlock.NullTarget<string>()); // ensure we empty the transform block

for (int i = 0; i < 100; i++)
{
    Stopwatch blockedTime = Stopwatch.StartNew();
    await DoSomething.SendAsync(i).ConfigureAwait(false);
    blockedTime.Stop();
    Console.WriteLine($"Submitted {i}\tBlocked for {blockedTime.ElapsedMilliseconds}ms.");
}

DoSomething.Complete();
await DoSomething.Completion;
Console.WriteLine("Completed.");
Console.ReadKey();

The results show that messages 50-54 were all received by the block. Messages 51-54 completed, then the console window displays no output for 10 seconds before it displays that message 50 completed and message 55 was able to be received by the block.

...
Submitted 50    Blocked for 0ms.
Submitted 51    Blocked for 0ms.
processed message 51
Submitted 52    Blocked for 0ms.
x == 50 reached, delaying for 10 seconds.
processed message 52
processed message 53
Submitted 53    Blocked for 0ms.
Submitted 54    Blocked for 0ms.
processed message 54 // when run, 10 seconds pause happens after displaying this line
processed message 50 
processed message 55
Submitted 55    Blocked for 9998ms.
...

Why does the Transform Block not continue to fill the block up to the Bounded Capacity of 5, and use the other 4 degrees of parallelism to continue processing messages?

An ActionBlock does not display these symptoms and continues processing messages on other available parallel lines.

An unbounded capacity TransformBlock also does not display these symptoms.


Solution

  • Because by-default the parameter EnsureOrdered is true, so it's trying to maintain the order of the results. That's to say, it can't continue processing past the BoundedCapacity because it's required to maintain order, which is the back pressure you see in your tests.

    Additionally, an ActionBlock doesn't exhibit this behavior as it doesn't output to any other block (it's a dead end, so to speak), and as such there is no concept of ordering, back pressure is only limited by the bounded capacity and degree of parallelism.

    DataflowBlockOptions.EnsureOrdered Property

    By default, dataflow blocks enforce ordering on the processing of messages. Setting EnsureOrdered to false tells a block that it may relax this ordering if it's able to do so. This can be beneficial if making a processed result immediately available is more important than maintaining the input-to-output ordering.

    The fix is to remove the ordered requirement

    new ExecutionDataflowBlockOptions 
           { 
               BoundedCapacity = 5,
               MaxDegreeOfParallelism = 5 ,
               EnsureOrdered = false
           });