Search code examples
c#tpl-dataflow.net-core-3.1

Can not run TPL Dataflow pipeline


I am trying to create a pipeline using TPL Dataflow where i can store messages in a batch block , and whenever its treshold is hit it would send the data to an action block.I have added a buffer block in case the action block is too slow.
So far i have tried all possible methods to move data from the first block to the second to no avail. I have linked the blocks , added the DataFlowLinkOptions of PropagateCompletion set to true. What else do I have to do in order for this pipeline to work ?

Pipeline

class LogPipeline<T>
{
    private ActionBlock<T[]> actionBlock;
    private BufferBlock<T> bufferBlock;
    private BatchBlock<T> batchBlock;
    private readonly Action<T[]> action;
    private readonly int BufferSize;
    private readonly int BatchSize;

    public LogPipeline(Action<T[]> action, int bufferSize = 4, int batchSize = 2)
    {
        this.BufferSize = bufferSize;
        this.BatchSize = batchSize;
        this.action = action;
    }
    private void Initialize()
    {
        this.bufferBlock = new BufferBlock<T>(new DataflowBlockOptions
            { TaskScheduler = TaskScheduler.Default,
            BoundedCapacity = this.BufferSize });
        this.actionBlock = new ActionBlock<T[]>(this.action);
        this.batchBlock = new BatchBlock<T>(BatchSize);
        this.bufferBlock.LinkTo(this.batchBlock, new DataflowLinkOptions
            { PropagateCompletion = true });
        this.batchBlock.LinkTo(this.actionBlock, new DataflowLinkOptions
            { PropagateCompletion = true });
    }
    public void Post(T log)
    {
        this.bufferBlock.Post(log);
    }
    public void Start()
    {
        this.Initialize();
    }
    public void Stop()
    {
        actionBlock.Complete();
    }
}

Test

[TestCase(100, 1000, 5)]
public void CanBatchPipelineResults(int batchSize, int bufferSize, int cycles)
{

    List<int> data = new List<int>();
    LogPipeline<int> logPipeline = new LogPipeline<int>(
       batchSize: batchSize,
       bufferSize: bufferSize,
       action: (logs) =>
       {
           data.AddRange(logs);
       });
    logPipeline.Start();

    int SelectWithEffect(int element)
    {
        logPipeline.Post(element);
        return 3;
    }
    int count = 0;
    while (true)
    {
        if (count++ > cycles)
        {
            break;
        }
        var sent = Parallel.For(0, bufferSize, (x) => SelectWithEffect(x));
    }
    logPipeline.Stop();
    Assert.IsTrue(data.Count == cycles * batchSize);
}

Why are all my blocks empty besides the buffer? I have tried with SendAsync also to no avail. No data is moved from the first block to the next no matter what I do.

I have both with and without the link options.

Update : I have completely erased the pipeline and also the Parallel. I have tried with all kinds of input blocks (batch/buffer/transform) and it seems there is no way subsequent blocks are getting something.
I have also tried with await SendAsync as well as Post.
I have only tried within unit tests classes.
Could this be the issue ?

Update 2 I was wrong complicating things , i have tried a more simple example . Inside a testcase even this doesnt work:

List<int> items=new List<int>(); var tf=new TransformBlock<int,int>(x=>x+1); var action= new ActionBlock<int>(x=>items.Add(x)); tf.LinkTo(action, new DataFlowOptions{ PropagateCompletion=true}); tf.Post(3); //Breakpoint here


Solution

  • The reason nothing seems to happen before the test ends is that none of the block has a chance to run. The code blocks all CPUs by using Parallel.For so no other task has a chance to run. This means that all posted messages are still in the first block. The code then calls Complete on the last block but doesn't even await for it to finish processing before checking the results.

    The code can be simplified a lot. For starters, all blocks have input buffers, they don't need extra buffering.

    The pipeline could be replaced with just this :

    //Arrange
    var list=new List<int>();
    
    var head=new BatchBlock<int>(BatchSize);
    var act=new ActionBlock<int[]>(nums=>list.AddRange(nums);
    
    var options= new DataflowLinkOptions{ PropagateCompletion = true };
    head.LinkTo(act);
    
    //ACT
    
    //Just fire everything at once, because why not
    var tasks=Enumerable.Range(0,cycles)(
        i=>Task.Run(()=> head.Post(i)));
    await tasks;
    
    //Tell the head block we're done
    head.Complete();
    //Wait for the last block to complete
    await act.Completion;
    
    //ASSERT
    Assert.Equal(cycles, data.Count);
    

    There's no real need to create a complex class to encapsulate the pipeline. It doesn't "start" - the blocks do nothing if they have no data. To abstract it, one only needs to provide access to the head block and the last block's Completion task