I have a transform block that is linked to an action block based on a predicate.
// Blocks
public TransformBlock<Document, Document> DocumentCreationTransformBlock =
new TransformBlock<Document, Document>(async document =>
{
return await CreateAsync(document); // REST API call that sets document.NewId
},
new ExecutionDataflowBlockOptions {
BoundedCapacity = 100,
MaxDegreeOfParallelism = 20
});
public ActionBlock<Document> SplitPipelineActionBlock =
new ActionBlock<Document>(async document =>
{ // implementation obfuscated
},
new ExecutionDataflowBlockOptions {
BoundedCapacity = 100
};
// Shared block elements
public DataflowLinkOptions CommonLinkOptions = new DataflowLinkOptions {
PropagateCompletion = true };
// Link mesh
DocumentCreationTransformBlock.LinkTo(SplitPipelineActionBlock,
CommonLinkOptions,
document => !string.IsNullOrEmpty(document.NewId));
DocumentCreationTransformBlock.LinkTo(DataflowBlock.NullTarget<Document>(),
CommonLinkOptions);
The transform block tries create a document via a REST API. It should update the Document
object with a NewId
. So the LinkTo predicate checks that the returned Document has a NewId.
For any objects that don't meet this criteria, there is a NullTarget
block to empty the TransformBlock
.
In my test, I posted 10,100 items to the pipeline and confirmed that all items returned a NewId
succesfully. However, 130 items are being passed to the NullTarget
. When I rerun the program over the entire set again, more than 3000 items are being passed to the NullTarget
. Even items that previously successfully stored a NewId
.
I suspect there is an issue with the SplitPipelineActionBlock
BoundedCapacity
being full, and the LinkTo
just ignoring the predicate and then passing the item to be processed by the next LinkTo, which is the NullTarget
.
How can I make it so that all items are given a chance to be sent to the SplitPipeLineAction
block?
When an item is available from a source block, it will offer it to its links one at a time; if any link does not receive the item, then that item is offered to the next link. There's no distinguishing why a link isn't used. Blocks do distinguish "maybe later" replies from "no" replies (Postponed
vs Declined
), but in either case, the next link will be attempted for a linked block that can take it now.
Your best option to resolve this is to add a predicate to the null block link that is the negative of the predicate for the target block link.
Predicate<Document> predicate = document => !string.IsNullOrEmpty(document.NewId);
DocumentCreationTransformBlock.LinkTo(SplitPipelineActionBlock,
CommonLinkOptions,
predicate);
DocumentCreationTransformBlock.LinkTo(DataflowBlock.NullTarget<Document>(),
CommonLinkOptions,
document => !predicate(document));
This way, when SplitPipelineActionBlock
is full, then the item is offered to the null link, it's rejected due to failing the predicate, and the item stays in the transform block output buffer until SplitPipelineActionBlock
has room.