My _baseBlockContainer.GetBaseBlocks();
returns a ConcurrentQueue
with 15317
objects. For further processing I want to sort them by Type
. However, it always "misses" some objects.
It appears that my Parallel.ForEach
is not thread-safe because the amount of objects within the ConcurrentQueue
for a Type
is sometimes less (off by 1 to 250 objects for a Type
) than when sorted by the synchronous foreach
; but I do not see where/why.
var baseBlocks = _baseBlockContainer.GetBaseBlocks();
var baseBlocksByTypeConcurrent = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this always differ
Parallel.ForEach(baseBlocks, bb =>
{
if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
{
baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
});
var baseBlocksByType = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this are always the same
foreach (var bb in baseBlocks)
{
if (!baseBlocksByType.ContainsKey(bb.GetType()))
{
baseBlocksByType[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByType[bb.GetType()].Enqueue(bb);
}
Replace this:
if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
{
baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
with this:
baseBlocksByTypeConcurrent.TryAdd(bb.GetType(), new ConcurrentQueue<BaseBlock>());
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
The problem with your existing code is that if .ContainsKey
evaluates to false
in multiple threads at the same time for the same block type, then they will all set the value corresponding to the type to a new queue, erasing any existing queue for that type. That is to say: ContainsKey
and the indexer are, by themselves, thread safe, but not when used separately in the way you are doing it.
TryAdd
is thread safe and will only add that key once, rather than rewriting it as assigning to the indexer would.