Search code examples
c#.nettask-parallel-librarytpl-dataflow

DataflowBlock.Complete() supposedly stops the block from producing more messages, what happens to the queued items?


The documentation of .Complete() says the following:

Signals to the IDataflowBlock that it should not accept nor produce any more messages nor consume any more postponed messages.

So let's say I have a BatchedJoinBlock that I want to complete when its source block completes, but I can't propagate because the links are on predicates:

block1.LinkTo(block2.Target1, predicate);
block1.LinkTo(block2.Target2, !predicate);

await block1.Completion;
block2.Complete();

So according to the documentation that last line will stop block2 from producing new messages, but the problem is that block1 completes as soon as block2 has accepted all its messages, but these accepted messages might not be processed yet, so what happens to them if block2 can't produce new messages?

Perhaps I'm misinterpreting produce anymore messages?


Solution

  • Complete means the block will stop accepting more messages, but it will process messages it has in its InputQueue even though this will result in sending messages forward in the pipeline (as opposed to when the block is faulted because of an exception).

    You can see that with this simple example:

    var transformBlock = new TransformBlock<int, int>(async _ =>
    {
        await Task.Delay(100);
        return _;
    });
    
    var actionBlock = new ActionBlock<int>(_ => Console.WriteLine(_));
    transformBlock.LinkTo(actionBlock);
    
    for (int i = 0; i < 1000; i++)
    {
        await transformBlock.SendAsync(i);
    }
    
    transformBlock.Complete();
    Console.WriteLine("complete");
    await transformBlock.Completion;
    Console.WriteLine("completed");
    

    complete will be written immediately but the TransformBlock will continue processing the messages and slowly moving them to the next ActionBlock.


    I too am not entirely sure what "produce anymore messages" actually means. I'm assuming that it relates to blocks that produce messages and not ones that process incoming messages though I can't think of one.