Typical situation: Fast producer, slow consumer, need to slow producer down.
Sample code that doesn't work as I expected (explained below):
// I assumed this block will behave like BlockingCollection, but it doesn't
var bb = new BufferBlock<int>(new DataflowBlockOptions {
BoundedCapacity = 3, // looks like this does nothing
});
// fast producer
int dataSource = -1;
var producer = Task.Run(() => {
while (dataSource < 10) {
var message = ++dataSource;
bb.Post(message);
Console.WriteLine($"Posted: {message}");
}
Console.WriteLine("Calling .Complete() on buffer block");
bb.Complete();
});
// slow consumer
var ab = new ActionBlock<int>(i => {
Thread.Sleep(500);
Console.WriteLine($"Received: {i}");
}, new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 2,
});
bb.LinkTo(ab);
ab.Completion.Wait();
How I thought this code would work, but it doesn't:
BufferBlock
bb
is the blocking queue with capacity of 3. Once capacity reached, producer should not be able to .Post()
to it, until there's a vacant slot.
bb
seems to happily accept any number of messages.producer
is a task that quickly Posts messages. Once all messages have been posted, the call to bb.Complete()
should propagate through the pipeline and signal shutdown once all messages have been processed. Hence waiting ab.Completion.Wait();
at the end.
.Complete()
is called, action block ab
won't receive any more messages.Can be done with a BlockingCollection
, which I thought in TPL Dataflow (TDF) world BufferBlock
was the equivalent of. I guess I'm misunderstanding how backpressure is supposed to work in TPL Dataflow.
So where's the catch? How to run this pipeline, not allowing more than 3 messages in the buffer bb
, and wait for its completion?
PS: I found this gist (https://gist.github.com/mnadel/df2ec09fe7eae9ba8938) where it's suggested to maintain a semaphore to block writing to BufferBlock
. I thought this was "built-in".
Update after accepting an answer:
ActionBlock
also has its own input buffer.That's for one. Then you also need to realize, that because all blocks have their own input buffers you don't need the BufferBlock
for what you might think its name implied. A BufferBlock
is more like a utility block for more complex architectures or like a balance loading block. But it's not a backpressure buffer.
When calling .LinkTo()
need to explicitly pass new DataflowLinkOptions {PropagateCompletion = true}
as the 2nd argument.
With the guidance from JSteward's answer, I came up with the following code. It produces (reads etc.) new items concurrently with processing said items, maintaining a read-ahead buffer. The completion signal is sent to the head of the chain when the "producer" has no more items. The program also awaits the completion of the whole chain before terminating.
static async Task Main() {
string Time() => $"{DateTime.Now:hh:mm:ss.fff}";
// the buffer is added to the chain just for demonstration purposes
// the chain would work fine using just the built-in input buffer
// of the `action` block.
var buffer = new BufferBlock<int>(new DataflowBlockOptions {BoundedCapacity = 3});
var action = new ActionBlock<int>(async i =>
{
Console.WriteLine($"[{Time()}]: Processing: {i}");
await Task.Delay(500);
}, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 2, BoundedCapacity = 2});
// it's necessary to set `PropagateCompletion` property
buffer.LinkTo(action, new DataflowLinkOptions {PropagateCompletion = true});
//Producer
foreach (var i in Enumerable.Range(0, 10))
{
Console.WriteLine($"[{Time()}]: Ready to send: {i}");
await buffer.SendAsync(i);
Console.WriteLine($"[{Time()}]: Sent: {i}");
}
// we call `.Complete()` on the head of the chain and it's propagated forward
buffer.Complete();
await action.Completion;
}