Search code examples
c#.nettask-parallel-librarytpl-dataflow

Custom Dataflow transform block with dynamic MaxDegreeOfParallelism


I receive a sequence of objects (e.g., ItemGroup[]) that each contains multiple jobs (e.g., Item[]) and a max degree of parallelism value, for example:

public record Item(string Name);
public record ItemGroup(Item[] Items, int MaxDegreeOfParallelism);

The sequence of ItemGroup instances must be processed sequentially, but each ItemGroup may have a max degree of parallelism higher than 1. For example, the pipeline will process the group of A* items sequentially, then process the group of B* items concurrently:

var groups = new[]
{
    new ItemGroup(new[] { new Item("A0"), new Item("A1"), new Item("A2") }, 1),
    new ItemGroup(new[] { new Item("B0"), new Item("B1"), new Item("B2") }, 3)
};

I thought a custom TransformManyBlock implementation derived from IPropagatorBlock<ItemGroup, Item> would be a good choice, but I am not clear how to properly wait on the TransformManyBlock dynamically created internally as the producer posts ItemGroup instances to it.

Can anyone guide me here?


Solution

  • You could create an inner TransformBlock<Item, Item> for each ItemGroup received. Below is a generalized solution with TInput, TChild and TOutput generic parameters. The TInput corresponds to a ItemGroup, the TChild corresponds to a Item, and the TOutput is also Item since you propagate the items without transforming them:

    public static IPropagatorBlock<TInput, TOutput> CreateTransformManyDynamicBlock
        <TInput, TChild, TOutput>(
        Func<TInput, IEnumerable<TChild>> childrenSelector,
        Func<TInput, int> degreeOfParallelismSelector,
        Func<TChild, TOutput> transformChild)
    {
        ArgumentNullException.ThrowIfNull(childrenSelector);
        ArgumentNullException.ThrowIfNull(degreeOfParallelismSelector);
        ArgumentNullException.ThrowIfNull(transformChild);
    
        return new TransformManyBlock<TInput, TOutput>(async input =>
        {
            TransformBlock<TChild, TOutput> innerBlock = new(transformChild, new()
            {
                MaxDegreeOfParallelism = degreeOfParallelismSelector(input)
            });
            foreach (var child in childrenSelector(input))
            {
                bool accepted = innerBlock.Post(child);
                if (!accepted) break; // The innerBlock has failed
            }
            innerBlock.Complete();
    
            // Propagate the results
            List<TOutput> results = new();
            while (await innerBlock.OutputAvailableAsync().ConfigureAwait(false))
                while (innerBlock.TryReceive(out TOutput result))
                    results.Add(result);
            try { await innerBlock.Completion.ConfigureAwait(false); }
            catch when (innerBlock.Completion.IsCanceled) { throw; }
            catch { innerBlock.Completion.Wait(); } // Propagate AggregateException
            return results;
        });
    }
    

    Usage example:

    IPropagatorBlock<ItemGroup, Item> block =
        CreateTransformManyDynamicBlock<ItemGroup, Item, Item>(
            x => x.Items, x => x.MaxDegreeOfParallelism, x => x);
    

    Note: The above code has not been tested.


    Update: My original implementation (revision 1) was based on the .NET 6 API ReceiveAllAsync, and the TransformManyBlock constructor that takes a Func<TInput,IAsyncEnumerable<TOutput>> argument (.NET 7). The problem was that the ReceiveAllAsync doesn't propagate the exception of the enumerated dataflow block, so I switched to collecting and propagating manually the results, by filling a List<TOutput> as shown in this answer.