Search code examples
c#.nettasktask-parallel-librarytpl-dataflow

await SendAsync does not await on TPL Dataflow BatchBlock


The example program has the following BatchBlock: new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });, to which there are 60 int data items being sent and on a separate task consumed.

The issue is that the await sourceBlock.SendAsync(i);, doesn't seem to be awaiting, even though BatchBlock bounded capacity is reached the data is still being continuously sent without consuming task taking out any items first. Eventually the BatchBlock receives only 2 batches of 10 int data items. I would expect await sourceBlock.SendAsync(i); to pause execution when 20 items are sent, as bounded capacity of the block is set to 10 with the maximum of 2 groups. Then at some point consuming task would receive the data and the process would repeat.

I have attached the code below, create a simple console app, add the following to the main:

BatchBlockIssueReplication().GetAwaiter().GetResult();

The method to call:

    public static async Task BatchBlockIssueReplication()
    {
        var sourceBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });

        // Reading data from the source block
        Task fireAndForget = Task.Run(async () =>
        {
            while (!sourceBlock.Completion.IsCanceled)
            {
                await Task.Delay(1500);
                if (await sourceBlock.OutputAvailableAsync() && sourceBlock.TryReceiveAll(out var results))
                {
                    Console.WriteLine("Received: ");
                    foreach (var result in results)
                    {
                        Console.Write($"{result.Length} ");
                    }
                    Console.WriteLine();
                }
            }
        });

        for (int i = 0; i < 60; i++)
        {
            Console.WriteLine($"Sending {i} to the source block");
            await sourceBlock.SendAsync(i);
        }
        Console.WriteLine("Finished sending data to the source block");

        await Task.Delay(10000);
    }

Solution

  • You haven't set the BoundedCapacity, which controls how many items can wait in the input buffer. Exceeding that will make SendAsync await.

    You set the MaxNumberOfGroups property which is how many groups this block is going to generate before refusing to receive any other input.

    From the docs :

    Gets or sets the maximum number of groups that should be generated by the block.

    If you want your block to keep eg 20 blocks in the inputbuffer and wait, you should set BoundedCapacity :

    var sourceBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions 
                                              { 
                                                  BoundedCapacity = 20 
                                              });