Search code examples
c#task-parallel-library.net-5tpl-dataflow

How to ensure BatchBlock has completed


I'm trying to get an understanding of using BatchBlock as it seems to be what I'll need for a project I'm working on. However, I'm getting hung up on one part in particular, and that's how to ensure the batch has completed. Take the following code for example. It has a batch size of 5. Five values are posted to it

using System;
using System.Linq;
using System.Threading.Tasks.Dataflow;

namespace sandbox
{
    class Program
    {

        public static void Main(string[] args)
        {
            BatchBlock<int> batchBlock = new BatchBlock<int>(5);
            ActionBlock<int[]> actionBlock = new ActionBlock<int[]>(OutputAverage);

            batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });

            batchBlock.Post(1);
            batchBlock.Post(2);
            batchBlock.Post(3);
            batchBlock.Post(4);
            batchBlock.Post(5);

            batchBlock.Complete();
            batchBlock.Completion.Wait();
        }

        private static void OutputAverage(int[] values) =>
            Console.WriteLine("The average is: " + values.Average());

    }
}

If I run the above code, nothing is output to the console window. This is because the program execution is finishing before the batch has had time to process. If I add a super small Thread.Sleep(100); at the end, then that is enough of a delay for it to complete the batch and output the expected text to the console.

There are other solutions I've though of, including the classic Console.ReadLine() but ultimately, I'm going to need this to be something that can be called and runs until completion with all batches properly processed without the need for someone to watch it and hit enter to close it out.

The actual project I'm wanting to use this is one that reads in a large (>4GB) binary encoded file and processes chunks at a time to parse records. The BatchBlock part will be used to take batches of the records parsed and output them to JSON files in parallel with the parsing. Currently everything is working as expected except in my test files, which are small and only contain ~400 records, the program is exiting before the batch thread finishes.

So ultimately, my question is, what can I do to ensure that all batches have been processed to completion by the action before the program exits?


Solution

  • You need to wait the last block in the pipeline, not the first. Just replace this:

    batchBlock.Completion.Wait();
    

    ...with this:

    actionBlock.Completion.Wait();