Search code examples
c#concurrencyparallel-processingparallel.foreach

Parallel.ForEach missing/ignoring/jumping over/not adding objects


My _baseBlockContainer.GetBaseBlocks(); returns a ConcurrentQueue with 15317 objects. For further processing I want to sort them by Type. However, it always "misses" some objects.

It appears that my Parallel.ForEach is not thread-safe because the amount of objects within the ConcurrentQueue for a Type is sometimes less (off by 1 to 250 objects for a Type) than when sorted by the synchronous foreach; but I do not see where/why.

var baseBlocks = _baseBlockContainer.GetBaseBlocks();

var baseBlocksByTypeConcurrent = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this always differ
Parallel.ForEach(baseBlocks, bb =>
{
  if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
  {
    baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
  }
  baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
});

var baseBlocksByType = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this are always the same
foreach (var bb in baseBlocks)
{
  if (!baseBlocksByType.ContainsKey(bb.GetType()))
  {
     baseBlocksByType[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
  }
  baseBlocksByType[bb.GetType()].Enqueue(bb);
}

Solution

  • Replace this:

    if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
    {
        baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
    }
    baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
    

    with this:

    baseBlocksByTypeConcurrent.TryAdd(bb.GetType(), new ConcurrentQueue<BaseBlock>());
    baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
    

    The problem with your existing code is that if .ContainsKey evaluates to false in multiple threads at the same time for the same block type, then they will all set the value corresponding to the type to a new queue, erasing any existing queue for that type. That is to say: ContainsKey and the indexer are, by themselves, thread safe, but not when used separately in the way you are doing it.

    TryAdd is thread safe and will only add that key once, rather than rewriting it as assigning to the indexer would.