Search code examples
c#tpl-dataflow

Linked DataFlow block completion is not working


I have a BroadcastBlock linked to ActionBlock. When I call "complete" sequentially on both the BroadcastBlock and ActionBlock, it is not working. Whereas just calling "complete" on BroadCastBlock alone is working.

public class ActionTester
{
    private readonly ActionBlock<int> _action;
    private readonly BroadcastBlock<int> _input;

    public ActionTester()
    {
        _input = new BroadcastBlock<int>(null);
        _action = new ActionBlock<int>(i => Process(i));

        _input.LinkTo(_action, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Post(int i) => _input.Post(i);

    public async Task Process(int i)
    {
        await Task.Delay(2000);
        Console.WriteLine(i);
    }

    public void Complete()
    {
        _input.Complete();
        _action.Complete(); // When this is removed, program is working as expected
    }

    public Task Completion => _action.Completion;
}

The test code is

static void Main(string[] args)
{
    var actor = new ActionTester();

    actor.Post(5);
    actor.Post(7);

    actor.Complete();
    actor.Completion.Wait();

    Console.WriteLine("Finished");
    Console.Read();
}

When _action.Complete() is present (commented line), code moves past actor.Completion.wait() and "Finished" is getting displayed. If I remove _aciton.Complete(), the posted values are displayed properly and then "Finished" is written.

When dataflow blocks are linked, should we just call "Complete" of the root block only? Setting PropagateCompletion to true/false has no effect.

Solution

Resolved by waiting for the root block completion

public void Complete()
{
    _input.Complete();
    _input.Completion.Wait();
    _action.Complete(); 
}

Solution

  • When dataflow blocks are linked, should we just call "Complete" of the root block only?

    Yes, this is exactly what should you do.

    If you do propagate the Completion, then you do not need to complete _actionBlock by yourself - it will be completed after all the items from buffer are processed. After you do complete the action block, it wouldn't accept any new messages, so the described behavior is expected and valid.