Search code examples
c#.nettpl-dataflow

BoundedCapacity of linked ActionBlock is not respected


I have a sequential pipeline that consists of two steps.

(simplified example)

The first step simply adds 1000 to the input number. The second step simply displays the number.

var transformBlock = new TransformBlock<int, long>(StepOne, new ExecutionDataflowBlockOptions
{
      MaxDegreeOfParallelism = 1,
      BoundedCapacity = DataflowBlockOptions.Unbounded,
});
var actionBlock = new ActionBlock<long>(StepTwo, new ExecutionDataflowBlockOptions
{
      MaxDegreeOfParallelism = 1,
      BoundedCapacity = 2,
});
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
    PropagateCompletion = true
});

for (int i = 0; i < 100; i++)
{
    transformBlock.Post(i);
}

static async Task<long> StepOne(int item)
{
    await Task.Delay(500);
    Console.WriteLine("transforming: " + item);
    return (long)item + 1000;
}

static async Task StepTwo(long item)
{
    await Task.Delay(1000);
    Console.WriteLine("final product: " + item);
}

Since step 2 is taking longer than step 1, I would expect step 1 to throttle after some time since it cannot send the result to the bounded buffer of step 2.

Expected output:
Transforming: 0
Transforming: 1
Final product: 1000
Transforming: 2
Final product: 1001
Transforming: 3
Final product: 1002
Transforming: 4
Final product: 1003
...

Actual output:
Transforming: 0
Transforming: 1
Final product: 1000
Transforming: 2
Transforming: 3
Final product: 1001
Transforming: 4
Transforming: 5
Final product: 1002
Transforming: 6
Transforming: 7
Final product: 1003
...


Solution

  • A TransformBlock maintains two queues internally, an input queue and an output queue. The size of these two queues can be monitored at any moment through the InputCount and OutputCount properties. The accumulated size of these two queues is configured by the BoundedCapacity option, so the sum InputCount+OutputCount is always less than or equal to the BoundedCapacity value. In your case the BoundedCapacity of the block is Unbounded, so there is no limiting factor at how large these two queues can become (other than some hard limits like the Int32.MaxValue probably). The fact that the linked ActionBlock has a limited bounded capacity is mostly irrelevant, and has no consequence other than delaying the transfer of the transformed values from the output queue of the TransformBlock to the input queue of the ActionBlock. This consequence is only observable if you monitor the OutputCount property of the source block, and the InputCount property of the target block. It wouldn't even matter if the TransformBlock was not linked to any target block. It would happily continue crunching numbers by itself, until some hard limit was hit, or the memory of the machine was exhausted.