Search code examples
c#async-awaitpriority-queuetpl-dataflowbufferblock

Task.WhenAny acting like Task.WhenAll


I wrote a little program to test using BufferBlock (System.Threading.Tasks.Dataflow) to implement a dual-priority consumer-producer queue.

The consumer should always use any items from the high-priority queue first.

In this initial test, I have the producer running at a much slower rate than the consumer, so the data should just come out in the same order it went in, regardless of priority.

However, I find that the result of Task.WhenAny() is not completing until there is something in both queues (or there's a completion), thus acting like Task.WhenAll().

I thought I understood async/await, and I've perused Cleary's "Concurrency in C# Cookbook." However, there is something going on that I'm not understanding.

Any ideas?

Code:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;  // add nuget package, 4.8.0
using static System.Console;

namespace DualBufferBlockExample { // .Net Framework 4.6.1

    class Program {
        private static async Task Produce(BufferBlock<int> queueLo, BufferBlock<int> queueHi, IEnumerable<int> values) {
            await Task.Delay(10);
            foreach(var value in values) {
                if(value == 3 || value == 7)
                    await queueHi.SendAsync(value);
                else
                    await queueLo.SendAsync(value);
                WriteLine($"Produced {value}  qL.Cnt={queueLo.Count} qH.Cnt={queueHi.Count}");
                await Task.Delay(1000);  // production lag
            }

            queueLo.Complete();
            queueHi.Complete();
        }
        private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queueLo, BufferBlock<int> queueHi) {
            var results = new List<int>();

            while(true) {
                int value = -1;

                while(queueLo.Count > 0 || queueHi.Count > 0) {  // take from hi-priority first
                    if(queueHi.TryReceive(out value) ||
                        queueLo.TryReceive(out value)) {  // process value
                        results.Add(value);
                        WriteLine($"    Consumed {value}");
                        await Task.Delay(100); // consumer processing time shorter than production
                    }
                }

                var hasNorm = queueHi.OutputAvailableAsync();
                var hasLow = queueLo.OutputAvailableAsync();
                var anyT = await Task.WhenAny(hasNorm, hasLow);  // <<<<<<<<<< behaves like WhenAll
                WriteLine($"  WhenAny {anyT.Result} qL.Result={hasLow.Result} qH.Result={hasNorm.Result} qL.Count={queueLo.Count} qH.Count={queueHi.Count}");

                if(!anyT.Result)
                    break;  // both queues are empty & complete
            }

            return results;
        }
        static async Task TestDataFlow() {
            var queueLo = new BufferBlock<int>();
            var queueHi = new BufferBlock<int>();

            // Start the producer and consumer.
            var consumer = Consume(queueLo, queueHi);
            WriteLine("Consumer Started");

            var producer = Produce(queueLo, queueHi, Enumerable.Range(0, 10));
            WriteLine("Producer Started");

            // Wait for everything to complete.
            await Task.WhenAll(producer, consumer, queueLo.Completion, queueHi.Completion);

            // show consumer's output
            var results = await consumer;

            Write("Results:");
            foreach(var x in results)
                Write($" {x}");
            WriteLine();
        }
        static void Main(string[] args) {
            try {
                TestDataFlow().Wait();
            } catch(Exception ex) {
                WriteLine($"TestDataFlow exception: {ex.ToString()}");
            }
            ReadLine();
        }
    }
}

Output:

Consumer Started
Producer Started
Produced 0  qL.Cnt=1 qH.Cnt=0
Produced 1  qL.Cnt=2 qH.Cnt=0
Produced 2  qL.Cnt=3 qH.Cnt=0
Produced 3  qL.Cnt=3 qH.Cnt=1
  WhenAny True qL.Result=True qH.Result=True qL.Count=3 qH.Count=1
    Consumed 3
    Consumed 0
    Consumed 1
    Consumed 2
Produced 4  qL.Cnt=1 qH.Cnt=0
Produced 5  qL.Cnt=2 qH.Cnt=0
Produced 6  qL.Cnt=3 qH.Cnt=0
Produced 7  qL.Cnt=3 qH.Cnt=1
  WhenAny True qL.Result=True qH.Result=True qL.Count=3 qH.Count=1
    Consumed 7
    Consumed 4
    Consumed 5
    Consumed 6
Produced 8  qL.Cnt=1 qH.Cnt=0
Produced 9  qL.Cnt=2 qH.Cnt=0
  WhenAny True qL.Result=True qH.Result=False qL.Count=2 qH.Count=0
    Consumed 8
    Consumed 9
  WhenAny False qL.Result=False qH.Result=False qL.Count=0 qH.Count=0
Results: 3 0 1 2 7 4 5 6 8 9

Solution

  • After calling WhenAny your immediately blocking on both tasks using .Result without knowing that they're both complete.

    var anyT = await Task.WhenAny(hasNorm, hasLow);
    
    //This line blocks on both the hasNorm and hasLow tasks preventing execution from continuing. 
    WriteLine($"  WhenAny {anyT.Result} qL.Result={hasLow.Result} qH.Result={hasNorm.Result} qL.Count={queueLo.Count} qH.Count={queueHi.Count}");
    

    awaiting both tasks will also give you the same behavior. The best you can do is await the task returned from WhenAny and only print the results from the completed task.

    Additionally, a priority queue is not something that TPL-Dataflow does well out of the box. It treats all messages equally so you end plugging in your own priority implementation. That said you can make it work.