I'm designing a pretty simple producer-consumer with Dataflow. I need a BatchBlock
for batching a rather slow consumer (which can only be single-threaded due to EF DbContext). Everything works as expected, unless the fact that my producer is producing more message than I've set the in the BoundedCapacity
. I was expected the producer to produce 1000 messages and then wait. What am I doing wrong?
[Test]
public async Task Run()
{
int producerCount = 0;
int consumerCount = 0;
Action logProgress = () =>
{
if (consumerCount % 1000 == 0 && consumerCount > 0 || producerCount % 1000 == 0 || consumerCount == producerCount)
{
Trace.WriteLine($"Progress - {consumerCount}/{producerCount}");
}
};
var batchBlock = new BatchBlock<int>(500, new GroupingDataflowBlockOptions { BoundedCapacity = 1000 });
var actionBlock = new ActionBlock<int[]>(async x =>
{
await Task.Delay(10);
foreach (var specificItem in x)
{
Interlocked.Increment(ref consumerCount);
}
logProgress();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity= 1000,
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
int[] items = Enumerable.Range(0, 240000).ToArray();
foreach (var item in items)
{
await batchBlock.SendAsync(item);
Interlocked.Increment(ref producerCount);
logProgress();
}
batchBlock.Complete();
await Task.WhenAll(batchBlock.Completion, actionBlock.Completion);
}
Output:
Progress - 0/1000
Progress - 0/2000
Progress - 0/3000
Progress - 0/4000
Progress - 0/5000
Progress - 0/6000
Progress - 0/7000
Progress - 0/8000
Progress - 0/9000
Progress - 0/10000
Progress - 0/11000
...
Progress - 31000/240000
Progress - 31500/240000
Progress - 32000/240000
Progress - 32500/240000
Progress - 33000/240000
Progress - 33500/240000
...
240,000 items, batched in batches of 500 items each, can be arranged in 480 batches. These are less than the 1,000 limit imposed by the BoundedCapacity
policy of the ActionBlock<int[]>
, so this policy has no observable effect with the given workload.
Specifically the ActionBlock<int[]>
always responds with DataflowMessageStatus.Accepted
when the BatchBlock<int>
invokes its OfferMessage
method. It never responds with Postponed
, which is the behavior of a ITargetBlock<TInput>
block that has reached the limit of its capacity. So the BoundedCapacity
policy of the BatchBlock<int>
has no effect either, because the BatchBlock<int>
is always able to propagate immediately the batches it creates to its linked target.