Search code examples
c#task-parallel-librarytpl-dataflow

C# using DataflowBlock.Completion to cancel consumer tasks instead of CancellationToken


I'm wondering if there is a neat way for IDataflowBlock.Completion to replace needing to use a cancellation token for ReceiveAsync or a similar method which consumes from BufferBlock or another IDataflowBlock.

IDataflowBlock.ReceiveAsync<T>(TimeSpan, CancellationToken)

If InputQueue is a BufferBlock:

BufferBlock<String> InputQueue 

for (int i = 0; i < 26; i++)            
{
    await InputQueue.SendAsync(((char)(97 + i)).ToString());
}

If InputQueue.Complete(); has been called, then when the queue is emptied and IDataflowBlock.Completion will change to status RanToCompletion, which can be checked with IDataflowBlock.Completion.IsCompleted.

If multiple threads are taking from the queue this could happen during InputQueue.ReceiveAsync, is there a neater alternative to handle InputQueue completing than:

try
{
    String parcel = await InputQueue.ReceiveAsync(timeSpan);
}
catch(InvalidOperationException x)
{

}

Solution

  • The simplest way to cancel a Dataflow Block is to provide the token to block's constructor, like this:

    new ExecutionDataflowBlockOptions
    {
        CancellationToken = cancellationSource.Token
    });
    

    CancellationToken is defined in Dataflow​Block​Options class, so even BufferBlock could be canceled.

    Why are you implementing the Receive logic by yourself? Is there some restriction no to use the PropagateCompletion with linking your blocks? For example, if your code looks like this:

    internal void HandleMessage()
    {
        try
        {
            var parcel = await InputQueue.ReceiveAsync(timeSpan);
            // handle parsel
        }
        catch(InvalidOperationException x)
        {
        }
    }
    

    Then you simply may use the ActionBlock like this:

    var InputQueue = new BufferBlock<string>();
    var Handler = new ActionBlock<string>(parcel =>
    {
        // handle parsel
    });
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    InputQueue.LinkTo(Handler, linkOptions);
    
    // now after you call Complete method for InputQueue the completion will be propagated to your Handler block:
    for (int i = 0; i < 26; i++)            
    {
        await InputQueue.SendAsync(((char)(97 + i)).ToString());
    }
    InputQueue.Complete();
    await Handler.Completion;
    

    Also note that if you need some interaction with UI, you may use your last block as IObservable with Rx.Net library.