I'm wondering if there is a neat way for IDataflowBlock.Completion
to replace needing to use a cancellation token for ReceiveAsync
or a similar method which consumes from BufferBlock
or another IDataflowBlock
.
IDataflowBlock.ReceiveAsync<T>(TimeSpan, CancellationToken)
If InputQueue
is a BufferBlock
:
BufferBlock<String> InputQueue
for (int i = 0; i < 26; i++)
{
await InputQueue.SendAsync(((char)(97 + i)).ToString());
}
If InputQueue.Complete();
has been called, then when the queue is emptied and IDataflowBlock.Completion
will change to status RanToCompletion,
which can be checked with IDataflowBlock.Completion.IsCompleted
.
If multiple threads are taking from the queue this could happen during InputQueue.ReceiveAsync
, is there a neater alternative to handle InputQueue
completing than:
try
{
String parcel = await InputQueue.ReceiveAsync(timeSpan);
}
catch(InvalidOperationException x)
{
}
The simplest way to cancel a Dataflow Block is to provide the token to block's constructor, like this:
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token
});
CancellationToken
is defined in DataflowBlockOptions
class, so even BufferBlock
could be canceled.
Why are you implementing the Receive
logic by yourself? Is there some restriction no to use the PropagateCompletion
with linking your blocks? For example, if your code looks like this:
internal void HandleMessage()
{
try
{
var parcel = await InputQueue.ReceiveAsync(timeSpan);
// handle parsel
}
catch(InvalidOperationException x)
{
}
}
Then you simply may use the ActionBlock
like this:
var InputQueue = new BufferBlock<string>();
var Handler = new ActionBlock<string>(parcel =>
{
// handle parsel
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
InputQueue.LinkTo(Handler, linkOptions);
// now after you call Complete method for InputQueue the completion will be propagated to your Handler block:
for (int i = 0; i < 26; i++)
{
await InputQueue.SendAsync(((char)(97 + i)).ToString());
}
InputQueue.Complete();
await Handler.Completion;
Also note that if you need some interaction with UI, you may use your last block as IObservable
with Rx.Net
library.