Search code examples
c#task-parallel-librarytpl-dataflow

Two-branched data flow network does not complete


This data flow network has a single bifurcation and produces the correct text output with proper results. Why does it not complete?

            // Connect multiple blocks
            // source -> convertToDouble -> multiply -> multiplyBuffer -> summation -> writeOut
            //                                                        |-> multiply2 -> writeListOut
            var source = new BufferBlock<List<int>>();
            var convertToDouble = new TransformBlock<List<int>, List<double>>((List<int> l) =>
            {
                return l.Select(_l => (double)_l).ToList();
            });
            source.LinkTo(convertToDouble);
            Func<List<double>, List<double>> multiplyFunc = (List<double> l) =>
            {
                return l.Select(_l => _l * 10.0).ToList();
            };
            var multiply = new TransformBlock<List<double>, List<double>>(multiplyFunc);
            convertToDouble.LinkTo(multiply);
            var multiplyBuffer = new BroadcastBlock<List<double>>((List<double> l) =>
            {
                return l;
            });
            multiply.LinkTo(multiplyBuffer);
            var summation = new TransformBlock<List<double>, double>((List<double> l) =>
            {
                return l.Sum();
            });
            multiplyBuffer.LinkTo(summation);
            var writeOut = new ActionBlock<double>((double d) =>
            {
                Console.WriteLine("Writing out: " + d.ToString());
            });
            summation.LinkTo(writeOut);
            var multiply2 = new TransformBlock<List<double>, List<double>>(multiplyFunc);
            multiplyBuffer.LinkTo(multiply2);
            var writeListOut = new ActionBlock<List<double>>((List<double> l) =>
            {
                Console.WriteLine("Writing list out: " + string.Join(", ", l.Select(_l => 
                    _l.ToString()).ToList()));
            });
            multiply2.LinkTo(writeListOut);

            source.Post(new List<int> { 1, 2, 3 });

            Task.Run(async () =>
            {
                await Task.Delay(3000);
                Console.WriteLine("posting 2nd...");
                source.Post(new List<int> { 4, 5, 6 });
                source.Complete();
            });

            // Never completes
            try
            {
                writeOut.Completion.Wait();
                writeListOut.Completion.Wait();
            }
            catch (AggregateException ex)
            {
                ex.Handle(e =>
                {
                    Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
                    return true;
                });
            }

I've noticed if the Completion.wait() calls are omitted, then the program returns. There are no errors to be observed upon execution of the network.

Sample output:

Writing list out: 100, 200, 300 Writing out: 60 posting 2nd... Writing out: 150 Writing list out: 400, 500, 600 (hangs)

Expected output:

Writing list out: 100, 200, 300 Writing out: 60 posting 2nd... Writing out: 150 Writing list out: 400, 500, 600 (returns)


Solution

  • In the TPL, source completion is not passed along to other blocks by default.

    You need to construct a System.Threading.Tasks.Dataflow.DataflowLinkOptions and set its PropagateCompletion property to true, and then pass that into the your invocations of LinkTo.

    Alternately you can call the Complete method on all of your blocks sequentially.