The idea is to process a list of items, individually and then batch up and then go back to processing individually seamlessly. In the batch processing block, I may be querying or saving to the database. Hitting the DB with a batch is a lot more efficient than hitting it multiple times per each item in the list.
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static async Task Main(string[] args)
{
var execOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded };
var block1 = new TransformBlock<WorkItem, WorkItem>(async item =>
{
// perform work on individual item
await Task.Delay(1000);
Console.WriteLine($"Block 1 - Item {item.Id}");
return item;
}, execOptions);
var block2 = new TransformBlock<WorkItem, WorkItem>(async item =>
{
// perform more work on individual item
await Task.Delay(1000);
Console.WriteLine($"Block 2 - Item {item.Id}");
return item;
}, execOptions);
var batch = new BatchBlock<WorkItem>(5);
var batchWork = new ActionBlock<WorkItem[]>(async items =>
{
Console.WriteLine($"batchWork - {items.Length} Items");
// perform batch work - query database, etc.
await Task.Delay(2000);
await Task.WhenAll(items.Select(x => block2.SendAsync(x)));
}, execOptions);
var batch2 = new BatchBlock<WorkItem>(10);
var save = new ActionBlock<WorkItem[]>(async items =>
{
Console.WriteLine($"save - {items.Length} Items");
// save items to the DB
await Task.Delay(2000);
}, execOptions);
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
block1.LinkTo(batch, linkOptions);
batch.LinkTo(batchWork, linkOptions);
block2.LinkTo(batch2, linkOptions);
batch2.LinkTo(save, linkOptions);
Console.WriteLine("Starting work");
var workItems = Enumerable.Range(1, 10).Select(x => new WorkItem { Id = x }).ToArray();
await Task.WhenAll(workItems.Select(x => block1.SendAsync(x)));
block1.Complete();
await batchWork.Completion;
block2.Complete();
await save.Completion;
Console.WriteLine("All Done");
Console.WriteLine("Hit Enter");
Console.ReadLine();
}
}
class WorkItem
{
public int Id { get; set; }
}
}
I am looking for some feedback. Basically the above code sample seems to work. The critical piece of code is within "batchWork" where I am queueing up to "block2" by calling SendAsync on each item. I don't know of a way to link up any other way. Perhaps there is a better approach to what I am trying to accomplish here. Any suggestions?
You don't need to use SendAsync
. You can change batchWork
to a TransformManyBlock
and connect it to the next block:
var batchWork = new TransformManyBlock<WorkItem[],WorkItem>(async items =>
{
Console.WriteLine($"batchWork - {items.Length} Items");
// perform batch work - query database, etc.
return items;
}, execOptions);
....
batch.LinkTo(batchWork, linkOptions);
batchWork.LinkTo(block2, linkOptions);
block2.LinkTo(batch2, linkOptions);