I receive a sequence of objects (e.g., ItemGroup[]
) that each contains multiple jobs (e.g., Item[]
) and a max degree of parallelism value, for example:
public record Item(string Name);
public record ItemGroup(Item[] Items, int MaxDegreeOfParallelism);
The sequence of ItemGroup
instances must be processed sequentially, but each ItemGroup
may have a max degree of parallelism higher than 1. For example, the pipeline will process the group of A*
items sequentially, then process the group of B*
items concurrently:
var groups = new[]
{
new ItemGroup(new[] { new Item("A0"), new Item("A1"), new Item("A2") }, 1),
new ItemGroup(new[] { new Item("B0"), new Item("B1"), new Item("B2") }, 3)
};
I thought a custom TransformManyBlock
implementation derived from IPropagatorBlock<ItemGroup, Item>
would be a good choice, but I am not clear how to properly wait on the TransformManyBlock
dynamically created internally as the producer posts ItemGroup
instances to it.
Can anyone guide me here?
You could create an inner TransformBlock<Item, Item>
for each ItemGroup
received. Below is a generalized solution with TInput
, TChild
and TOutput
generic parameters. The TInput
corresponds to a ItemGroup
, the TChild
corresponds to a Item
, and the TOutput
is also Item
since you propagate the items without transforming them:
public static IPropagatorBlock<TInput, TOutput> CreateTransformManyDynamicBlock
<TInput, TChild, TOutput>(
Func<TInput, IEnumerable<TChild>> childrenSelector,
Func<TInput, int> degreeOfParallelismSelector,
Func<TChild, TOutput> transformChild)
{
ArgumentNullException.ThrowIfNull(childrenSelector);
ArgumentNullException.ThrowIfNull(degreeOfParallelismSelector);
ArgumentNullException.ThrowIfNull(transformChild);
return new TransformManyBlock<TInput, TOutput>(async input =>
{
TransformBlock<TChild, TOutput> innerBlock = new(transformChild, new()
{
MaxDegreeOfParallelism = degreeOfParallelismSelector(input)
});
foreach (var child in childrenSelector(input))
{
bool accepted = innerBlock.Post(child);
if (!accepted) break; // The innerBlock has failed
}
innerBlock.Complete();
// Propagate the results
List<TOutput> results = new();
while (await innerBlock.OutputAvailableAsync().ConfigureAwait(false))
while (innerBlock.TryReceive(out TOutput result))
results.Add(result);
try { await innerBlock.Completion.ConfigureAwait(false); }
catch when (innerBlock.Completion.IsCanceled) { throw; }
catch { innerBlock.Completion.Wait(); } // Propagate AggregateException
return results;
});
}
Usage example:
IPropagatorBlock<ItemGroup, Item> block =
CreateTransformManyDynamicBlock<ItemGroup, Item, Item>(
x => x.Items, x => x.MaxDegreeOfParallelism, x => x);
Note: The above code has not been tested.
Update: My original implementation (revision 1) was based on the .NET 6 API ReceiveAllAsync
, and the TransformManyBlock
constructor that takes a Func<TInput,IAsyncEnumerable<TOutput>>
argument (.NET 7).
The problem was that the ReceiveAllAsync
doesn't propagate the exception of the enumerated dataflow block,
so I switched to collecting and propagating manually the results, by filling a List<TOutput>
as shown in this answer.