Search code examples
c#tpl-dataflow

Dataflow with BatchBlock - producer runs ahead


I'm designing a pretty simple producer-consumer with Dataflow. I need a BatchBlock for batching a rather slow consumer (which can only be single-threaded due to EF DbContext). Everything works as expected, unless the fact that my producer is producing more message than I've set the in the BoundedCapacity. I was expected the producer to produce 1000 messages and then wait. What am I doing wrong?

    [Test]
    public async Task Run()
    {
        int producerCount = 0;
        int consumerCount = 0;

        Action logProgress = () =>
        {
            if (consumerCount % 1000 == 0 && consumerCount > 0 || producerCount % 1000 == 0 || consumerCount == producerCount)
            {
                Trace.WriteLine($"Progress - {consumerCount}/{producerCount}");
            }
        };

        var batchBlock = new BatchBlock<int>(500, new GroupingDataflowBlockOptions { BoundedCapacity = 1000 });
        var actionBlock = new ActionBlock<int[]>(async x =>
        {
            await Task.Delay(10);
            foreach (var specificItem in x)
            {
                Interlocked.Increment(ref consumerCount);
            }
            logProgress();
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1,
            BoundedCapacity= 1000,
        });
        batchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
        int[] items = Enumerable.Range(0, 240000).ToArray();
        foreach (var item in items)
        {
            await batchBlock.SendAsync(item);
            Interlocked.Increment(ref producerCount);
            logProgress();
        }

        batchBlock.Complete();

        await Task.WhenAll(batchBlock.Completion, actionBlock.Completion);
    }

Output:

Progress - 0/1000
Progress - 0/2000
Progress - 0/3000
Progress - 0/4000
Progress - 0/5000
Progress - 0/6000
Progress - 0/7000
Progress - 0/8000
Progress - 0/9000
Progress - 0/10000
Progress - 0/11000
...
Progress - 31000/240000
Progress - 31500/240000
Progress - 32000/240000
Progress - 32500/240000
Progress - 33000/240000
Progress - 33500/240000
...

Solution

  • 240,000 items, batched in batches of 500 items each, can be arranged in 480 batches. These are less than the 1,000 limit imposed by the BoundedCapacity policy of the ActionBlock<int[]>, so this policy has no observable effect with the given workload.

    Specifically the ActionBlock<int[]> always responds with DataflowMessageStatus.Accepted when the BatchBlock<int> invokes its OfferMessage method. It never responds with Postponed, which is the behavior of a ITargetBlock<TInput> block that has reached the limit of its capacity. So the BoundedCapacity policy of the BatchBlock<int> has no effect either, because the BatchBlock<int> is always able to propagate immediately the batches it creates to its linked target.