Search code examples
c#task-parallel-library.net-4.5tpl-dataflow

How to implement continuously running dataflow blocks in TPL?


I have producer/consumer dataflow block set-up using BufferBlock and ActionBlock and it is working fine inside Console application;

After adding all items into BurfferBlock and Linking BufferBlock with other Action Items; it is working good.

now I want to use that inside service where this dataflow block pipeline will always be up and and when messages will be available through external events it would go inside bufferblock and that will start processing. How can I achieve this?

So far I have done below:

public void SetupPipeline()
{
    FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

    BufferBlock = new BufferBlock<WorkItem>();

    GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
    GroupingDataflowBlockOptions.Greedy = true;
    GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
    CancellationTokenSource = new CancellationTokenSource();
    CancellationToken = CancellationTokenSource.Token;
    GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
    BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);

    ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
        ProcessWorkItems(WorkItems.ToList<WorkItem>()),
        new ExecutionDataflowBlockOptions
      {
          CancellationToken = CancellationToken
      });

    Timer = new Timer(_ =>
            BatchBlock.TriggerBatch()
        );

    TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
    {
        Timer.Change(TimerInterval, Timeout.Infinite);
        logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
        return WorkItem;
    }, new ExecutionDataflowBlockOptions
    {
        CancellationToken = CancellationToken
    });

    BatchBlock.LinkTo(ProcessItems);
    TimingBlock.LinkTo(BatchBlock);
    BufferBlock.LinkTo(TimingBlock);
}

Solution

  • Your batch size is defined by the variable 'BoundingCapacity' in the batchblock constructor. A batch will be posted when:

    • A number of posts equal to the batch size have been received (specified in constructor)
    • The batch block is marked for completion
    • The triggerbatch method is called

    It seems like you want a batch to post when the bath size is met or a timeout occurs. If this is the case, and if batch size is not critical, I would really just add a recurring interval to the timer you have and make the object downstream of the batchblock ignore empty posts.

    What you may actually want, and what is most inline with the philosophy of dataflow programming, is to create a new batch block when you begin posting a series of items and then completing it when done or when a timeout occurs. New posts would create a new batchblock if one does not already exist.

    The problem with trying to implement a timeout timer around the batchblock that only fires based on the first trigger is that you will either need to count and verify posts to bufferblock or you will need to watch posts from bufferblock. Both of these scenarios will create a lot of ugliness and/or violate block encapsulation.