When a TransformBlock
has a MaxDegreeOfParallelism > 1
and BoundedCapacity
that isn't unbounded, why does it postpone receiving further messages while there is one long running task despite there being capacity in the input queue?
Take the following console application. It creates a TransformBlock with a MaxDegreeOfParallelism = 5
and BoundedCapacity = 5
then feeds it 100 messages. When the block processes message x == 50
, it delays that task for 10 seconds.
TransformBlock<int, string> DoSomething = new TransformBlock<int, string>(async (x) => {
if (x == 50)
{
Console.WriteLine("x == 50 reached, delaying for 10 seconds.");
await Task.Delay(10000);
}
Console.WriteLine($"processed message {x}");
return x.ToString();
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 });
DoSomething.LinkTo(DataflowBlock.NullTarget<string>()); // ensure we empty the transform block
for (int i = 0; i < 100; i++)
{
Stopwatch blockedTime = Stopwatch.StartNew();
await DoSomething.SendAsync(i).ConfigureAwait(false);
blockedTime.Stop();
Console.WriteLine($"Submitted {i}\tBlocked for {blockedTime.ElapsedMilliseconds}ms.");
}
DoSomething.Complete();
await DoSomething.Completion;
Console.WriteLine("Completed.");
Console.ReadKey();
The results show that messages 50-54 were all received by the block. Messages 51-54 completed, then the console window displays no output for 10 seconds before it displays that message 50 completed and message 55 was able to be received by the block.
...
Submitted 50 Blocked for 0ms.
Submitted 51 Blocked for 0ms.
processed message 51
Submitted 52 Blocked for 0ms.
x == 50 reached, delaying for 10 seconds.
processed message 52
processed message 53
Submitted 53 Blocked for 0ms.
Submitted 54 Blocked for 0ms.
processed message 54 // when run, 10 seconds pause happens after displaying this line
processed message 50
processed message 55
Submitted 55 Blocked for 9998ms.
...
Why does the Transform Block not continue to fill the block up to the Bounded Capacity of 5, and use the other 4 degrees of parallelism to continue processing messages?
An ActionBlock
does not display these symptoms and continues processing messages on other available parallel lines.
An unbounded capacity TransformBlock
also does not display these symptoms.
Because by-default the parameter EnsureOrdered
is true
, so it's trying to maintain the order of the results. That's to say, it can't continue processing past the BoundedCapacity
because it's required to maintain order, which is the back pressure you see in your tests.
Additionally, an ActionBlock
doesn't exhibit this behavior as it doesn't output to any other block (it's a dead end, so to speak), and as such there is no concept of ordering, back pressure is only limited by the bounded capacity and degree of parallelism.
DataflowBlockOptions.EnsureOrdered Property
By default, dataflow blocks enforce ordering on the processing of messages. Setting
EnsureOrdered
to false tells a block that it may relax this ordering if it's able to do so. This can be beneficial if making a processed result immediately available is more important than maintaining the input-to-output ordering.
The fix is to remove the ordered requirement
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 5,
MaxDegreeOfParallelism = 5 ,
EnsureOrdered = false
});