I need to use a concurrent priority queue, and I was considering adapting the SimplePriorityQueue<TPriority, TValue>
sample given in the How to: Add Bounding and Blocking Functionality to a Collection tutorial on MSDN. However, I was surprised at the severity of the bugs that the said sample seems to have. Could someone verify whether these issues are really present?
1) A race hazard exists between TryAdd
and ToArray
, which can cause an ArgumentException
to be thrown from the latter. The TryAdd
method first adds an item to an internal queue, then increments the m_count
counter. On the other hand, ToArray
first initializes a new array of size m_count
, then copies the internal queues onto this array. If TryAdd
is called whilst ToArray
is being executed, then ToArray
might end up attempting to copy more items than it had allocated space for in the array, causing the CopyTo
call to throw an ArgumentException
.
private ConcurrentQueue<KeyValuePair<int, TValue>>[] _queues;
private int m_count;
// ...
// IProducerConsumerCollection members
public bool TryAdd(KeyValuePair<int, TValue> item)
{
_queues[item.Key].Enqueue(item);
Interlocked.Increment(ref m_count);
return true;
}
public int Count
{
get { return m_count; }
}
public KeyValuePair<int, TValue>[] ToArray()
{
KeyValuePair<int, TValue>[] result;
lock (_queues)
{
result = new KeyValuePair<int, TValue>[this.Count];
// *** context switch here; new item gets added ***
int index = 0;
foreach (var q in _queues)
{
if (q.Count > 0)
{
q.CopyTo(result, index); // *** ArgumentException ***
index += q.Count;
}
}
return result;
}
}
2) Another race hazard exists in the GetEnumerator
method: There is no synchronization between updates to the internal queues.
public IEnumerator<KeyValuePair<int, TValue>> GetEnumerator()
{
for (int i = 0; i < priorityCount; i++)
{
foreach (var item in _queues[i])
yield return item;
}
}
Consider the following code snippet, which takes an item from the queue and re-adds it with incremented priority:
if (queue.TryTake(out item) && item.Key < maxPriority - 1)
queue.TryAdd(new KeyValuePair<int, string>(item.Key + 1, item.Value))
If the above snippet were run concurrently with an enumeration, one would expect the item to appear at most once, either with the original or with the incremented priority – or, possibly, not appear at all. One would not expect the item to appear twice, at both priorities. However, since the GetEnumerator
iterates over its internal queues sequentially, it does not protect against such ordering inconsistencies across queues.
3) The public Count
property can return stale values, since it reads the shared m_count
field without any memory fences. If a consumer accesses this property in a loop that doesn't generate memory fences of its own, such as the below, they could be stuck in an infinite loop, despite items being added to the queue by other threads.
while (queue.Count == 0)
{ }
The need for memory fences when reading shared variables has been discussed in several other posts:
4) There is no memory barrier between the initialization of the _queues
array and the completion of the SimplePriorityQueue
constructor. Race hazards could arise when external consumers on another thread call TryAdd
and access _queues
before its initialization has completed (or appears as completed on their memory cache). This is discussed further in my other question on constructors and memory barriers.
5) TryTake
and ToArray
are protected through the use of the lock
keyword. Apart from being inadequate (due to the bugs discussed above), this also defeats the entire purpose of designing a concurrent collection. Given its shortcomings, I think the best approach would be to downgrade the internal ConcurrentQueue
structures to plain Queue
, add locks everywhere, and start treating this as a non-concurrent but thread-safe structure.
I think it depends on how you are going use the class. If you limit yourself to TryAdd
and TryTake
(which is the two main things BlockingCollection<T>
relies on) you shouldn't have any issues and you will have a very fast lock minimal priority queue.
If you start using Count
, CopyTo
, or any of the other methods you are potentially going to run in to the issues you pointed out.