Search code examples
c#task-parallel-libraryproducer-consumerdataflowtpl-dataflow

Is TPL Dataflow BufferBlock thread safe?


I have a fairly simple producer-consumer pattern where (simplified) I have two producers who produce output that is to be consumed by one consumer.

For this I use System.Threading.Tasks.Dataflow.BufferBlock<T>

A BufferBlock object is created. One Consumer is listening to this BufferBlock, and processes any received input.

Two 'Producerssend data to theBufferBlock` simultaneously

Simplified:

BufferBlock<int> bufferBlock = new BufferBlock<int>();

async Task Consume()
{
    while(await bufferBlock.OutputAvailable())
    {
         int dataToProcess = await outputAvailable.ReceiveAsync();
         Process(dataToProcess);
    }
}

async Task Produce1()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for this example
    }
}

async Task Produce2()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for this example
    }
}

I'd like to start the Consumer first and then start the Producers as separate tasks:

var taskConsumer = Consume(); // do not await yet
var taskProduce1 = Task.Run( () => Produce1());
var taskProduce2 = Task.Run( () => Produce2());

// await until both producers are finished:
await Task.WhenAll(new Task[] {taskProduce1, taskProduce2});
bufferBlock.Complete(); // signal that no more data is expected in bufferBlock

// await for the Consumer to finish:
await taskConsumer;

At first glance, this is exactly how the producer-consumer was meant: several producers produce data while a consumer is consuming the produced data.

Yet, BufferBlock about thread safety says:

Any instance members are not guaranteed to be thread safe.

And I thought that the P in TPL meant Parallel! Should I worry? Is my code not thread safe? Is there a different TPL Dataflow class that I should use?


Solution

  • Yes, the BufferBlock class is thread safe. I can't back this claim by pointing to an official document, because the "Thread Safety" section has been removed from the documentation. But I can see in the source that the class contains a lock object for synchronizing the incoming messages:

    /// <summary>Gets the lock object used to synchronize incoming requests.</summary>
    private object IncomingLock { get { return _source; } }
    

    When the Post extension method is called (source code), the explicitly implemented ITargetBlock.OfferMessage method is invoked (source code). Below is an excerpt of this method:

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
        T messageValue, ISourceBlock<T> source, bool consumeToAccept)
    {
        //...
        lock (IncomingLock)
        {
            //...
            _source.AddMessage(messageValue);
            //...
        }
    }
    

    It would be strange indeed if this class, or any other XxxBlock class included in the TPL Dataflow library, was not thread-safe. It would severely hamper the ease of use of this great library.