Search code examples
c#performancetask-parallel-libraryproducer-consumerblockingcollection

Producer consumer queue best practices and performance


I'm building a producer consumer queue in C# and I was reading for searching the best method in terms of robust and performance. For years I was using always BlockingCollection but I have discovered TPLDataflow and Channels.

I have been doing some benchmarking and I have seen that both TPL and channels are much more faster in dequeueing elements.

My requirements are:

  • Queue behaviour (maintain item ordering)
  • Multiple threads can enqueue elements
  • One thread reading elements (to maintain order)

IProducerConsumer interface:

public interface IProducerConsumer
{
    void Enqueue(Action item);
    void Stop();
    void StartDequeing();
}

Blocking collection implementation:

public class BlockingCollectionQueue : IProducerConsumer
{
    private readonly BlockingCollection<Action> _defaultQueue;
    private Task _dequeTask;

    public BlockingCollectionQueue()
    {
        _defaultQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
    }

    public void Enqueue(Action item)
    {
        if (!_defaultQueue.IsAddingCompleted)
        {
            _defaultQueue.Add(item);
        }
    }

    public void Stop()
    {
        _defaultQueue.CompleteAdding();
    }

    public void StartDequeing()
    {
        Task.Run(DequeueTask);
    }

    private void DequeueTask()
    {
        foreach (var item in _defaultQueue.GetConsumingEnumerable())
        {
            item?.Invoke();
        }
    }
}

Channel implementation:

public class ChannelQueue : IProducerConsumer
{
    private readonly Channel<Action> _channel;
    private readonly ChannelWriter<Action> _channelWriter;
    private readonly ChannelReader<Action> _channelReader;

    public ChannelQueue()
    {
        _channel = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true });
        _channelWriter = _channel.Writer;
        _channelReader = _channel.Reader;
    }

    public void Enqueue(Action item)
    {
        _channelWriter.TryWrite(item);
    }

    public void StartDequeing()
    {
        Task.Run(DequeueTask);
    }

    private async Task DequeueTask()
    {
        while (await _channelReader.WaitToReadAsync())
        {
            while (_channelReader.TryRead(out var job))
            {
                job?.Invoke();
            }
        }
    }

    public void Stop()
    {
        _channelWriter.Complete();
    }
}

TPLDataFlow using BufferBlock implementation:

public class DataFlowQueue : IProducerConsumer
{
    private readonly BufferBlock<Action> _bufferBlock;
    private Task _dequeTask;

    public DataFlowQueue()
    {
        var dataflowOptions = new DataflowBlockOptions() { EnsureOrdered = true };
        _bufferBlock = new BufferBlock<Action>(dataflowOptions);
    }

    public void Enqueue(Action item)
    {
        _bufferBlock.Post(item);
    }

    public void StartDequeing()
    {
        _dequeTask = Task.Run(DequeueTask);
    }

    private async Task DequeueTask()
    {
        while (await _bufferBlock.OutputAvailableAsync())
        {
            while(_bufferBlock.TryReceive(out var item))
            {
                item?.Invoke();
            }
        }
    }

    public void Stop()
    {
        _bufferBlock.Complete();
    }
}

TPLDataFlow using ActionBlock:

public class ActionBlockQueue : IProducerConsumer
{
    private readonly ActionBlock<Action> _actionBlock;
    private Task _dequeTask;
    
    public ActionBlockQueue()
    {
        var dataflowOptions = new ExecutionDataflowBlockOptions() { EnsureOrdered = true, MaxDegreeOfParallelism = 1 };
        _actionBlock = new ActionBlock<Action>(item=> item?.Invoke(), dataflowOptions);
    }

    public void Enqueue(Action item, QueuePriority priority = QueuePriority.Default)
    {
        _actionBlock.Post(item);
    }

    public void StartDequeing()
    {
    }

    public void Stop()
    {
        _actionBlock.Complete();
    }
}

Benchmark using BenchmarDotNet

As you can see all of the implementations are storing Action in the queues and I'm using an AutoResetEvent to signal when the last element is dequeued.

public class MultipleJobBenchMark
{
    private AutoResetEvent _autoResetEvent;

    public MultipleJobBenchMark()
    {
        _autoResetEvent = new AutoResetEvent(false);
    }

    [Benchmark]
    public void BlockingCollectionQueue()
    {
        DoMultipleJobs(new BlockingCollectionQueue());
    }

    [Benchmark]
    public void DataFlowQueue()
    {
        DoMultipleJobs(new DataFlowQueue());
    }

    [Benchmark]
    public void ActionBlockQueue()
    {
        DoMultipleJobs(new ActionBlockQueue());
    }

    [Benchmark]
    public void ChannelQueue()
    {
        DoMultipleJobs(new ChannelQueue());
    }

    private void DoMultipleJobs(IProducerConsumer producerConsumerQueue)
    {
        producerConsumerQueue.StartDequeing();
        int jobs = 100000;

        for (int i = 0; i < jobs - 1; i++)
        {
            producerConsumerQueue.Enqueue(() => { });
        }

        producerConsumerQueue.Enqueue(() => _autoResetEvent.Set());
        _autoResetEvent.WaitOne();
        producerConsumerQueue.Stop();
    }
}

Results

  • BlockingCollection: Mean 21.5ms
  • BufferBlock Queue: Mean 14.937ms
  • ActionBlock Queue: 6.007ms
  • Channel: 4.781ms

Questions and conclusions

By doing this exercise I have seen that at this time the use of BlockingCollection maybe is not the best option.

I don't understand why there is such a big difference between BufferBlock and ActionBlock. I have done both implementations because in my interface I was defined StartDequeue() method and with ActionBlock it is not possible because dequeuing is done at ActionBlock construct.

Is my implementation using BufferBlock the best?

I wanted to post here my results to see which is the most accepted of Producer consumer queue at this moment and why I have seen such a big difference between ActionBlock and BufferBlock


Solution

  • As your benchmarks reveal, the Channel<T> is a relatively more performant producer consumer queue than the BlockingCollection<T>. Which is reasonable since the Channel<T> is a newer component (2019), and takes advantage of the ValueTask<T> technology that was non-existent when the BlockingCollection<T> was introduced (2010). For this to have any measurable effect, you must be passing crazy many items per second through the queue. In which case it might be a good idea to consider processing the items in batches/chunks, instead of passing each item individually through the queue.

    In general I think that the BlockingCollection<T> is still a good option when your producer consumer system is synchronous, i.e. when the producer and the consumer are running on dedicated threads. The Channel<T> is a natural choice when you want to build an asynchronous system, i.e. you are calling asynchronous APIs and you want to make efficient use of threads. As for the components found in the TPL Dataflow library, they are a valid option when you want to build an asynchronous system that can run on older versions of .NET. There are very few reasons to prefer the older BufferBlock<T> over the newer Channel<T> when both are available. The Channel<T> has a cleaner and more expressive API, and offers more options. Like the ability to drop old items, when new items are added and the maximum capacity has been reached.

    A rare scenario where you may want to avoid the Channel<T> is in case your producer, or the consumer, or both, is using cancellation tokens in each and every asynchronous write/read operation, that are routinely canceled. This usage can trigger a memory leak in the Channel<T>, but not in the BufferBlock<T>. See this question for details.