Search code examples
task-parallel-librarytpl-dataflow

Using TPL Dataflow, how to compose workflow with BatchBlock in the middle and back out to individual item block?


The idea is to process a list of items, individually and then batch up and then go back to processing individually seamlessly. In the batch processing block, I may be querying or saving to the database. Hitting the DB with a batch is a lot more efficient than hitting it multiple times per each item in the list.

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var execOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded };
            var block1 = new TransformBlock<WorkItem, WorkItem>(async item =>
            {
                // perform work on individual item
                await Task.Delay(1000);
                Console.WriteLine($"Block 1 - Item {item.Id}");
                return item;
            }, execOptions);
            var block2 = new TransformBlock<WorkItem, WorkItem>(async item =>
            {
                // perform more work on individual item
                await Task.Delay(1000);
                Console.WriteLine($"Block 2 - Item {item.Id}");
                return item;
            }, execOptions);
            var batch = new BatchBlock<WorkItem>(5);
            var batchWork = new ActionBlock<WorkItem[]>(async items =>
            {
                Console.WriteLine($"batchWork - {items.Length} Items");
                // perform batch work - query database, etc.
                await Task.Delay(2000);
                await Task.WhenAll(items.Select(x => block2.SendAsync(x)));
            }, execOptions);
            var batch2 = new BatchBlock<WorkItem>(10);
            var save = new ActionBlock<WorkItem[]>(async items =>
            {
                Console.WriteLine($"save - {items.Length} Items");
                // save items to the DB
                await Task.Delay(2000);
            }, execOptions);

            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            block1.LinkTo(batch, linkOptions);
            batch.LinkTo(batchWork, linkOptions);
            block2.LinkTo(batch2, linkOptions);
            batch2.LinkTo(save, linkOptions);

            Console.WriteLine("Starting work");
            var workItems = Enumerable.Range(1, 10).Select(x => new WorkItem { Id = x }).ToArray();
            await Task.WhenAll(workItems.Select(x => block1.SendAsync(x)));
            block1.Complete();
            await batchWork.Completion;

            block2.Complete();
            await save.Completion;

            Console.WriteLine("All Done");
            Console.WriteLine("Hit Enter");
            Console.ReadLine();
        }
    }
    class WorkItem
    {
        public int Id { get; set; }

    }
}

I am looking for some feedback. Basically the above code sample seems to work. The critical piece of code is within "batchWork" where I am queueing up to "block2" by calling SendAsync on each item. I don't know of a way to link up any other way. Perhaps there is a better approach to what I am trying to accomplish here. Any suggestions?


Solution

  • You don't need to use SendAsync. You can change batchWork to a TransformManyBlock and connect it to the next block:

    var batchWork = new TransformManyBlock<WorkItem[],WorkItem>(async items =>
    {
        Console.WriteLine($"batchWork - {items.Length} Items");
        // perform batch work - query database, etc.
        
        return items;
    }, execOptions);
    
    ....
    
    batch.LinkTo(batchWork, linkOptions);
    batchWork.LinkTo(block2, linkOptions);
    block2.LinkTo(batch2, linkOptions);