Search code examples
c#task-parallel-librarytpl-dataflow

Converting my C# BlockingCollection based code to TPL dataflow


I have a specific problem that i am sure can be solvable using TPL dataflow. I am just new to this, so need your help expediting my understanding. My code is like this currently:

Current code

where Process1, Process2, Process3 each are Task. Objects are passed from one block to another block with blocking collection. I would like to do this:

Need like this

I read about TransformBlock, ActionBlock and BatchBlock..Can you help me how to use these classes to achieve above design.


Solution

  • You didn't provide any details on what your Process2 and Process3 blocks do with the input data so I made an assumption that each of them makes some unique transformation with the original objects so that the output list of objects contains a joined results of both processes.

    using System;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    
    namespace TPLDataFlowExample1
    {
        class Program
        {
            static void Main(string[] args)
            {
                var inputListOfObjects = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
    
                // Process1 block
                var process1 = new TransformBlock<int, int>(i => i * 2);
                // Broadcast block which passes objects to Process2 and Process3
                var broadCast = new BroadcastBlock<int>(null);
                // Process2 block
                var process2 = new TransformBlock<int, string>(i => $"Process 2: {i}");
                // Process3 block
                var process3 = new TransformBlock<int, string>(i => $"Process 3: {i}");
                // Just simple action block which will print the result
                var print = new ActionBlock<string>(s => Console.WriteLine(s));
    
                // Link the output of Process1 block with the input of Broadcast block. Propagate completion to the next block.
                process1.LinkTo(broadCast, new DataflowLinkOptions { PropagateCompletion = true });
                // Link the output of Broadcast block with the input of Process2 block. Propagate completion to the next block.
                broadCast.LinkTo(process2, new DataflowLinkOptions { PropagateCompletion = true });
                // Link the output of Broadcast block with the input of Process3 block. Propagate completion to the next block.
                broadCast.LinkTo(process3, new DataflowLinkOptions { PropagateCompletion = true });
                // Link the output of Process2 block with the input of Print block. 
                process2.LinkTo(print);
                // Link the output of Process2 block with the input of print block. 
                process3.LinkTo(print);
    
                // We didn't propagate completion to Print block because it must complete when both Process2 and Process3 blocks are in Completed state.
                Task.WhenAll(process2.Completion, process3.Completion).ContinueWith(_ => print.Complete());
    
                // Post data to the Process1 block
                foreach (var obj in inputListOfObjects)
                {
                    process1.Post(obj);
                }
    
                // Mark the Process1 block as complete
                process1.Complete();
                // Wait for the last block to process all messages
                print.Completion.Wait();
            }
        }
    }
    
    
    // Output:
    //
    // Process 2: 2
    // Process 3: 2
    // Process 3: 4
    // Process 3: 6
    // Process 3: 8
    // Process 3: 10
    // Process 3: 12
    // Process 3: 14
    // Process 3: 16
    // Process 3: 18
    // Process 3: 20
    // Process 2: 4
    // Process 2: 6
    // Process 2: 8
    // Process 2: 10
    // Process 2: 12
    // Process 2: 14
    // Process 2: 16
    // Process 2: 18
    // Process 2: 20
    // Press any key to continue . . .