I'm using a ConcurrentQueue<T>
. I need:
public class Cache
{
private readonly ConcurrentQueue<Item> _queue = new();
public void Enqueue(Item item) // what synchronisation to perform here?
{
ArgumentNullException.ThrowIfNull(item, nameof(item));
_queue.Enqueue(item);
}
public void Flush() // what synchronisation to perform here?
{
// dequeue all items
var items = new List<Item>();
while (_queue.TryDequeue(out var item))
items.Add(item);
// `DoSomething` could take time; therefore above we
// dequeued and cached all items, so other consumers
// can immediately continue enqueuing
foreach (var item in items)
DoSomething(item);
}
public void DoSomething(Item item)
{
// ...
}
}
I'm confused which is the best concurrency tool to synchronise access to _queue
(in Enqueue
and Flush
).
Should I use a simple lock
, or something more exotic?
UPDATE: to address the comments below, I'm not attached to ConcurrentQueue; if there's a more appropriate concurrent collection, please tell me.
The simplest option would be to just use a lock and a list:
public class Cache
{
private List<Item> _queue = new();
private readonly Lock _lockObj = new();
public void Enqueue(Item item)
{
lock (_lockObj)
{
_queue.Add(item);
}
}
public void Flush()
{
List<Item> items;
lock (_lockObj)
{
items = _queue;
_queue = new List<Item>();
}
foreach (var item in items)
DoSomething(item);
}
public void DoSomething(Item item)
{
...
}
}
This should perform fairly well given that the lock is only held momentarily. You might want to set the list capacity to avoid somewhat expensive reallocations. There are almost certainly other faster way to do this, but optimizations usually have a cost in readability.
But the question reveals a possible problematic view. The only point where the queue is guaranteed to be empty is just after _queue = new List<Item>()
, before the lock is released. But you are not doing anything at this point, and as soon as the lock is released there might be items in the queue again.
Put in another way, a thread that enqueues an item has no way to know if it was included in a flush or not, not unless there is some other form of synchronization. Nor can the flushing thread know if there is pending enqueues. So there should not really be any difference compared to your original version.
So it is not really clear what the actual goal is. Statements like "a single point in time" are problematic since there is no global ordering. The only order you can rely on is the one provided by the various synchronization primitives. Is DoSomething
required to be synchronized with regards to Enqueue
? If so it needs to be run inside the lock. Maybe you need some external synchronization to ensure all enqueuing has been done before flushing?
In general I would suggest taking a look at BlockingCollection, I find that more useful than raw ConcurrentQueues. DataFlow and Channels might be other libraries to consider.