I wrote a little program to test using BufferBlock (System.Threading.Tasks.Dataflow) to implement a dual-priority consumer-producer queue.
The consumer should always use any items from the high-priority queue first.
In this initial test, I have the producer running at a much slower rate than the consumer, so the data should just come out in the same order it went in, regardless of priority.
However, I find that the result of Task.WhenAny()
is not completing until there is something in both queues (or there's a completion), thus acting like Task.WhenAll()
.
I thought I understood async
/await
, and I've perused Cleary's "Concurrency in C# Cookbook." However, there is something going on that I'm not understanding.
Any ideas?
Code:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow; // add nuget package, 4.8.0
using static System.Console;
namespace DualBufferBlockExample { // .Net Framework 4.6.1
class Program {
private static async Task Produce(BufferBlock<int> queueLo, BufferBlock<int> queueHi, IEnumerable<int> values) {
await Task.Delay(10);
foreach(var value in values) {
if(value == 3 || value == 7)
await queueHi.SendAsync(value);
else
await queueLo.SendAsync(value);
WriteLine($"Produced {value} qL.Cnt={queueLo.Count} qH.Cnt={queueHi.Count}");
await Task.Delay(1000); // production lag
}
queueLo.Complete();
queueHi.Complete();
}
private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queueLo, BufferBlock<int> queueHi) {
var results = new List<int>();
while(true) {
int value = -1;
while(queueLo.Count > 0 || queueHi.Count > 0) { // take from hi-priority first
if(queueHi.TryReceive(out value) ||
queueLo.TryReceive(out value)) { // process value
results.Add(value);
WriteLine($" Consumed {value}");
await Task.Delay(100); // consumer processing time shorter than production
}
}
var hasNorm = queueHi.OutputAvailableAsync();
var hasLow = queueLo.OutputAvailableAsync();
var anyT = await Task.WhenAny(hasNorm, hasLow); // <<<<<<<<<< behaves like WhenAll
WriteLine($" WhenAny {anyT.Result} qL.Result={hasLow.Result} qH.Result={hasNorm.Result} qL.Count={queueLo.Count} qH.Count={queueHi.Count}");
if(!anyT.Result)
break; // both queues are empty & complete
}
return results;
}
static async Task TestDataFlow() {
var queueLo = new BufferBlock<int>();
var queueHi = new BufferBlock<int>();
// Start the producer and consumer.
var consumer = Consume(queueLo, queueHi);
WriteLine("Consumer Started");
var producer = Produce(queueLo, queueHi, Enumerable.Range(0, 10));
WriteLine("Producer Started");
// Wait for everything to complete.
await Task.WhenAll(producer, consumer, queueLo.Completion, queueHi.Completion);
// show consumer's output
var results = await consumer;
Write("Results:");
foreach(var x in results)
Write($" {x}");
WriteLine();
}
static void Main(string[] args) {
try {
TestDataFlow().Wait();
} catch(Exception ex) {
WriteLine($"TestDataFlow exception: {ex.ToString()}");
}
ReadLine();
}
}
}
Output:
Consumer Started
Producer Started
Produced 0 qL.Cnt=1 qH.Cnt=0
Produced 1 qL.Cnt=2 qH.Cnt=0
Produced 2 qL.Cnt=3 qH.Cnt=0
Produced 3 qL.Cnt=3 qH.Cnt=1
WhenAny True qL.Result=True qH.Result=True qL.Count=3 qH.Count=1
Consumed 3
Consumed 0
Consumed 1
Consumed 2
Produced 4 qL.Cnt=1 qH.Cnt=0
Produced 5 qL.Cnt=2 qH.Cnt=0
Produced 6 qL.Cnt=3 qH.Cnt=0
Produced 7 qL.Cnt=3 qH.Cnt=1
WhenAny True qL.Result=True qH.Result=True qL.Count=3 qH.Count=1
Consumed 7
Consumed 4
Consumed 5
Consumed 6
Produced 8 qL.Cnt=1 qH.Cnt=0
Produced 9 qL.Cnt=2 qH.Cnt=0
WhenAny True qL.Result=True qH.Result=False qL.Count=2 qH.Count=0
Consumed 8
Consumed 9
WhenAny False qL.Result=False qH.Result=False qL.Count=0 qH.Count=0
Results: 3 0 1 2 7 4 5 6 8 9
After calling WhenAny
your immediately blocking on both tasks using .Result
without knowing that they're both complete.
var anyT = await Task.WhenAny(hasNorm, hasLow);
//This line blocks on both the hasNorm and hasLow tasks preventing execution from continuing.
WriteLine($" WhenAny {anyT.Result} qL.Result={hasLow.Result} qH.Result={hasNorm.Result} qL.Count={queueLo.Count} qH.Count={queueHi.Count}");
awaiting
both tasks will also give you the same behavior. The best you can do is await
the task returned from WhenAny
and only print the results from the completed task.
Additionally, a priority queue is not something that TPL-Dataflow
does well out of the box. It treats all messages equally so you end plugging in your own priority implementation. That said you can make it work.