Search code examples
c#task-parallel-librarytpl-dataflow

TPL Dataflow: Why is the following blocking?


Why is as a call to Run() blocking and doesn't return?

It correclty prints the following output, which makes sense. As I have initialized the BatchBlock() with a size of 170 and create 200 volapoint objects. But why does it not return?

ConvertToVolaSurface 
ConvertToVolaSurface 
CalculateStrangles
CalculateStrangles

This is the code snippet i am working with

class Pipeline
{
    public void Run()
    {
        // Grouping block - Collect vola points until vola surface is full 
        var batchBlock1 = new BatchBlock<VolaPoint>(170);

        // Execution block - Convert vola points to surface
        var transformBlock0 = new TransformBlock<VolaPoint[], VolaSurface>(x => this.ConvertToVolaSurface(x));

        // Execution block - Calculate strangles 
        var transformBlock1 = new TransformBlock<VolaSurface, VolaSurface>(x => this.CalculateStrangles(x));

        var linkOptions = new DataflowLinkOptions()
        {
            PropagateCompletion = true
        };

        batchBlock1.LinkTo(transformBlock0, linkOptions);
        transformBlock0.LinkTo(transformBlock1, linkOptions);

        for (int i = 0; i <= 200; i++)
        {
            batchBlock1.Post(new VolaPoint());
        }

        batchBlock1.Complete();

        transformBlock1.Completion.Wait();
    }

    private VolaSurface ConvertToVolaSurface(VolaPoint[] volapoints)
    {
        Debug.WriteLine("ConvertToVolaSurface");

        return new VolaSurface();
    }

    private VolaSurface CalculateStrangles(VolaSurface volaSurface)
    {
        Debug.WriteLine("CalculateStrangles");

        return volaSurface;
    }
}   

Solution

  • Your last block is a TransformBlock, transform blocks have an output buffer that must be empty for them to Complete. Change your last block to an ActionBlock and await it's completion to not block the calling thread.

    public async Task Run()
    {
        // Grouping block - Collect vola points until vola surface is full 
        var batchBlock1 = new BatchBlock<VolaPoint>(170);
    
        // Execution block - Convert vola points to surface
        var transformBlock0 = new TransformBlock<VolaPoint[], VolaSurface>(x => this.ConvertToVolaSurface(x));
    
        // Execution block - Calculate strangles 
        var actionBlock1 = new ActionBlock<VolaSurface>(x => this.CalculateStrangles(x));
    
        var linkOptions = new DataflowLinkOptions()
        {
            PropagateCompletion = true
        };
    
        batchBlock1.LinkTo(transformBlock0, linkOptions);
        transformBlock0.LinkTo(actionBlock1, linkOptions);
    
        for (int i = 0; i <= 200; i++)
        {
            batchBlock1.Post(new VolaPoint());
        }
    
        batchBlock1.Complete();
    
        await actionBlock1.Completion;
    }
    

    Now if CalculateStrangles(x) is returning something then you need to link that last transform block to something else otherwise you'll never complete.