All C# of the implementations for producer-consumer collections [1] [2] seem to have interfaces vaguely similar to:
private Queue<T> items;
public void Produce(T item)
public T Consume()
Any implementations out there like the following?
private Queue<T> items;
public void Produce(T[] item)
public T[] Consume(int count)
The hope is that this would let me produce/consume varying numbers of items at a time without requiring excessive per-item locking. This seems necessary for producing/consuming large amounts of items, but I haven't had luck finding any implementations.
There are multiple possible ways depending on what exactly you want to implement.
There are the implementations of the IProducerConsumerCollection<T>
interface. The only thread safe implementation of this interface in the .NET framework to my knowledge is the BlockingCollection<T>
.
This class allows you to have blocking or non-blocking producers and consumers. The producer side is set between blocking and non-blocking by providing a capacity limit to the collection in the constructor. As the documentation of the BlockingCollection<T>.Add(T)
method states:
If a bounded capacity was specified when this instance of
BlockingCollection<T>
was initialized, a call to Add may block until space is available to store the provided item.
For fetching items you can use the different Take
and TryTake
methods or the extremely handy BlockingCollection<T>.GetConsumingEnumerable()
method that creates a IEnumerable<T>
that creates a IEnumerator<T>
that consumes one element from the BlockingCollection<T>
when fetching the next value and blocking in case the source collection is empty. That is until BlockingCollection<T>.CompleteAdding()
is called and the collection is not accepting any new data. At this point all instances consuming enumerable instances will stop blocking and report that there is no data anymore (as soon as all remaining data has been consumed.)
So you can basically implement a consumer like this:
BlockingCollection<...> bc = ...
foreach (var item in bc.GetConsumingEnumerable())
{
// do something with your item
}
Such a consumer could be started in multiple threads so you have multiple threads reading from your source if you choose to. You can create any number of consuming enumerables.
You should be aware that this collection is really only a wrapper. There is a constructor that allows you to set the kind of collection used. By default the ConcurrentQueue<T>
. This means that by default the collection behaves like this queue and is the First-In-First-Out collection, in case you only use one producer and one consumer.
All that being said, there is a alternative. In case you don't need the blocking part (or you want to implement the blocking part yourself) and if you don't require any order of elements inside your collection, there is the ConcurrentBag<T>
. This collection handles access from multiple threads, very efficiently. It uses smaller collections inside ThreadLocal<T>
wrappers. So each thread uses it's own storage and only if the thread runs out of items in it's own storage it starts fetching items from another threads storage.
Using this collection may be interesting in case producing and consuming happens sequential in your use case. So you first add all items and once that is done you consume all items, both with multiple threads.