Search code examples
c#task-parallel-librarydataflowtpl-dataflow

Backpressure via BufferBlock not working. (C# TPL Dataflow)


Typical situation: Fast producer, slow consumer, need to slow producer down.
Sample code that doesn't work as I expected (explained below):

//  I assumed this block will behave like BlockingCollection, but it doesn't 
var bb = new BufferBlock<int>(new DataflowBlockOptions {
    BoundedCapacity = 3, // looks like this does nothing
});

// fast producer
int dataSource = -1;
var producer = Task.Run(() => {
    while (dataSource < 10) {
        var message = ++dataSource;
        bb.Post(message);
        Console.WriteLine($"Posted: {message}");
    }
    Console.WriteLine("Calling .Complete() on buffer block");
    bb.Complete();
});

// slow consumer
var ab = new ActionBlock<int>(i => {
    Thread.Sleep(500);
    Console.WriteLine($"Received: {i}");
}, new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = 2,
});

bb.LinkTo(ab);

ab.Completion.Wait();

How I thought this code would work, but it doesn't:

  • BufferBlock bb is the blocking queue with capacity of 3. Once capacity reached, producer should not be able to .Post() to it, until there's a vacant slot.
    • Doesn't work like that. bb seems to happily accept any number of messages.
  • producer is a task that quickly Posts messages. Once all messages have been posted, the call to bb.Complete() should propagate through the pipeline and signal shutdown once all messages have been processed. Hence waiting ab.Completion.Wait(); at the end.
    • Doesn't work either. As soon as .Complete() is called, action block ab won't receive any more messages.

Can be done with a BlockingCollection, which I thought in TPL Dataflow (TDF) world BufferBlock was the equivalent of. I guess I'm misunderstanding how backpressure is supposed to work in TPL Dataflow.

So where's the catch? How to run this pipeline, not allowing more than 3 messages in the buffer bb, and wait for its completion?

PS: I found this gist (https://gist.github.com/mnadel/df2ec09fe7eae9ba8938) where it's suggested to maintain a semaphore to block writing to BufferBlock. I thought this was "built-in".

Update after accepting an answer:

Updates after accepting the answer:

If you're looking at this question, you need to remember that ActionBlock also has its own input buffer.

That's for one. Then you also need to realize, that because all blocks have their own input buffers you don't need the BufferBlock for what you might think its name implied. A BufferBlock is more like a utility block for more complex architectures or like a balance loading block. But it's not a backpressure buffer.

Completion propagation needs to be dfined at link level explicitly.

When calling .LinkTo() need to explicitly pass new DataflowLinkOptions {PropagateCompletion = true} as the 2nd argument.


Solution

  • With the guidance from JSteward's answer, I came up with the following code. It produces (reads etc.) new items concurrently with processing said items, maintaining a read-ahead buffer. The completion signal is sent to the head of the chain when the "producer" has no more items. The program also awaits the completion of the whole chain before terminating.

    static async Task Main() {
    
        string Time() => $"{DateTime.Now:hh:mm:ss.fff}";
    
        // the buffer is added to the chain just for demonstration purposes
        // the chain would work fine using just the built-in input buffer
        // of the `action` block.
        var buffer = new BufferBlock<int>(new DataflowBlockOptions {BoundedCapacity = 3});
    
        var action = new ActionBlock<int>(async i =>
        {
            Console.WriteLine($"[{Time()}]: Processing: {i}");
            await Task.Delay(500);
        }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 2, BoundedCapacity = 2});
    
        // it's necessary to set `PropagateCompletion` property
        buffer.LinkTo(action, new DataflowLinkOptions {PropagateCompletion = true});
    
        //Producer
        foreach (var i in Enumerable.Range(0, 10))
        {
            Console.WriteLine($"[{Time()}]: Ready to send: {i}");
            await buffer.SendAsync(i);
            Console.WriteLine($"[{Time()}]: Sent: {i}");
        }
    
        // we call `.Complete()` on the head of the chain and it's propagated forward
        buffer.Complete(); 
    
        await action.Completion;
    }