Search code examples
c#task-parallel-librarytpl-dataflowdataflow

TPL Block with round robin link?


I am using TPL a lot and have large data flow pipeline structure.
As part of the pipeline network I want to write some data to azure blob storage. We have a lot of data therefore we have 4 storage accounts and we want to distribute the data evenly between them.

Wanted to continue using the dataflow pipeline pattern therefore I want to implement a SourceBlock that if I link it to several target blocks it will send the messages to them with round robin. BufferBlock is not good enough because he is sending the message to the first block that accepts it, and assuming all the target blocks have large bounded capacity - all the messages will go to the first target block. BroadcastBlock is not good as well because I don`t want duplicates.

Any recommendations? Implementing the ISourceBlock interface with the round robing behavior seems not so simple and I wondered if there are simpler solutions out there? or any extensions to TPL that I am not familar with?


Solution

  • Are you aware of possibility to link the blocks with a predicate? This is a very simple and not well tested solution for sample:

    var buffer = new BufferBlock<int>();
    var consumer1 = new ActionBlock<int>(i => Console.WriteLine($"First: {i}"));
    var consumer2 = new ActionBlock<int>(i => Console.WriteLine($"Second: {i}"));
    var consumer3 = new ActionBlock<int>(i => Console.WriteLine($"Third: {i}"));
    var consumer4 = new ActionBlock<int>(i => Console.WriteLine($"Forth: {i}"));
    
    buffer.LinkTo(consumer1, i => Predicate(0));
    buffer.LinkTo(consumer2, i => Predicate(1));
    buffer.LinkTo(consumer3, i => Predicate(2));
    buffer.LinkTo(consumer4, i => Predicate(3));
    buffer.LinkTo(DataflowBlock.NullTarget<int>());
    
    for (var i = 0; i < 10; ++i)
    {
        buffer.Post(i);
    }
    buffer.Completion.Wait();
    

    One of the outputs:

    Third: 2
    First: 0
    Forth: 3
    Second: 1
    Second: 5
    Second: 9
    Third: 6
    Forth: 7
    First: 4
    First: 8
    

    What is going on here is you're maintaining the number of operation, and if current is suitable for the consumer, we just increment that. Note that you still should link the block without any predicate at least once for avoiding the memory issues (also, it's a good idea to test the round robin with block which do monitor the lost messages).