Search code examples
c#mapreduce.net-6.0tpl-dataflow

MapReduce using DataFlow library


I am trying to implement a classic map-reduce problem using System.Threading.Tasks.Dataflow, and although I can get something (sort of) working I'm struggling to see how to generalise this functionality.

Given a simple problem

  • Produce a stream of integers; and in parallel for each number
    • Square the number
    • add 5
    • divide by 2
  • Take the sum of all numbers

The problem im having is that I can get this working using a BufferBlock, but I have to specify the initial size of the set of parallel tasks. This is fine for the test code (below) as I know upfront how many items im going to queue, but say I didnt know... how would I set this pipeline up?

Test code used (Note I added a short delay into the first of the "Parallel" blocks just to see some processing time difference depending on degrees of parallelism):

using System.Diagnostics;
using System.Threading.Tasks.Dataflow;

var input = 10;

var fanOutBlock = new TransformManyBlock<int, int>(x =>
{
    return Enumerable.Range(1, x).Select(x => x);
});

var squareBlock = new TransformBlock<int, int>(async x =>
 {
     await Task.Delay(100);
     return x * x;
 }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var addFiveBlock = new TransformBlock<int, int>(x =>
{
    return x + 5;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var divTwoBlock = new TransformBlock<int, double>(x =>
{
    return x/2.0;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var batchBlock = new BatchBlock<double>(input);

var sumBlock = new TransformBlock<IList<double>,double>(x =>
{
    return x.Sum();
});

var options = new DataflowLinkOptions { PropagateCompletion = true };

fanOutBlock.LinkTo(squareBlock, options);
squareBlock.LinkTo(addFiveBlock, options);
addFiveBlock.LinkTo(divTwoBlock, options);
divTwoBlock.LinkTo(batchBlock, options);
batchBlock.LinkTo(sumBlock, options);


var sw = Stopwatch.StartNew();
fanOutBlock.Post(input);
fanOutBlock.Complete();


var result = sumBlock.Receive();
Console.WriteLine(result);
sw.Stop();
Console.WriteLine($"{sw.ElapsedMilliseconds}ms");

await sumBlock.Completion;

Solution

  • One idea is to configure the BatchBlock<T> with the maximum batchSize:

    var batchBlock = new BatchBlock<double>(Int32.MaxValue);
    

    When the batchBlock is completed (when its Complete method is invoked), it will emit a batch with all the messages it contains. The disadvantage is that by buffering every message, you might run out of memory in case the number of messages is huge. Or, if the number of messages is larger than Int32.MaxValue and miraculously you don't run out of memory, you'll get more than one batches, which regarding the logic that you are trying to implement will be a bug.

    A better idea is to implement a custom Dataflow block that aggregates the messages it receives on the fly. Something similar to the Aggregate LINQ operator:

    public static TResult Aggregate<TSource, TAccumulate, TResult>(
        this IEnumerable<TSource> source,
        TAccumulate seed,
        Func<TAccumulate, TSource, TAccumulate> function,
        Func<TAccumulate, TResult> resultSelector);
    

    Here is an implementation, that is composed by two native blocks, that are encapsulated with the DataflowBlock.Encapsulate method:

    public static IPropagatorBlock<TSource, TResult>
        CreateAggregateBlock<TSource, TAccumulate, TResult>(
        TAccumulate seed,
        Func<TAccumulate, TSource, TAccumulate> function,
        Func<TAccumulate, TResult> resultSelector,
        ExecutionDataflowBlockOptions options = default)
    {
        options ??= new ExecutionDataflowBlockOptions();
        var maxDOP = options.MaxDegreeOfParallelism;
        options.MaxDegreeOfParallelism = 1;
    
        var inputBlock = new ActionBlock<TSource>(item =>
        {
            seed = function(seed, item);
        }, options);
    
        var outputBlock = new TransformBlock<TAccumulate, TResult>(accumulate =>
        {
            return resultSelector(accumulate);
        }, options);
    
        options.MaxDegreeOfParallelism = maxDOP; // Restore initial value
    
        PropagateCompletion(inputBlock, outputBlock, () =>
        {
            outputBlock.Post(seed);
        });
    
        return DataflowBlock.Encapsulate(inputBlock, outputBlock);
    
        static void PropagateCompletion(IDataflowBlock source, IDataflowBlock target,
            Action onSuccessfulCompletion)
        {
            ThreadPool.QueueUserWorkItem(async _ =>
            {
                try { await source.Completion; } catch { }
                Exception exception =
                    source.Completion.IsFaulted ? source.Completion.Exception : null;
                if (source.Completion.IsCompletedSuccessfully)
                {
                    // The action is invoked before completing the target.
                    try { onSuccessfulCompletion(); }
                    catch (Exception ex) { exception = ex; }
                }
                if (exception != null) target.Fault(exception); else target.Complete();
            });
        }
    }
    

    A tricky part is how to propagate the completion of the one block to the other. My preferred technique is to invoke an async void method on the thread pool. This way any bug in my code will be exposed as a crashing unhandled exception. The alternative is to put the code in a fire-and-forget task continuation, in which case the effect of a bug will be most likely a silent deadlock.

    Another question mark is whether the mutations of the seed state are visible to all threads involved in the calculation. I've avoided putting explicit barriers or locks, and I am relying on the implicit barriers that the TPL includes when tasks are queued, and at the beginning/end of task execution.

    Usage example:

    var sumBlock = CreateAggregateBlock<double, double, double>(0.0,
        (acc, x) => acc + x, acc => acc);