Search code examples
c#concurrencyparallel-processingparallel.foreach

Parallel ForEach using a ConcurrentBag not working as expected


I have this code that processes items in a list:

    static readonly object _Lock = new object();

    public class Item
    {
        public string Name;
        public string ID;
    }

    static void Main(string[] args)
    {
        var items = new List<Item>
        {
            new Item { Name = "One", ID = "123" },
            new Item { Name = "Two", ID = "234" },
            new Item { Name = "Three", ID = "123" }
        };

        var itemsProcess = new ConcurrentBag<Item>();
        Parallel.ForEach(items, (item) =>
        {
            Item itemProcess = null;
            // lock (_Lock)
            {
                itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
            }
            if (itemProcess != null)
            {
                Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
            }
            else
            {
                itemsProcess.Add(item);
                Console.WriteLine($"Processing item [{item.Name}]");
                Thread.Sleep(1000); // do some work...
            }
        });

        Console.ReadKey();
      }

I am basically using the ConcurrentBag to check for the existence of an object based on several conditions.
I am expecting to always get an output like (order may vary):

Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]

But I sometimes get an output, which suggests my code is not thread safe:

Processing item [Three]
Processing item [One]
Processing item [Two]

So my assumption that itemsProcess.FirstOrDefault() will block was wrong.
Using the lock does not change anything. obviously, something is wrong here, and I really can't understand why?

I know I can "solve" this in other ways (one is to prepare the list before entering Parallel.ForEach()), but I really like to know why is this behavior?


Solution

  • You have 2 independent operations within your parallel loop: FirstOrDefault and Add.

    ConcurrentBag cannot ensure thread-safety between these 2 operations.

    An alternative would be ConcurrentDictionary, which has a GetOrAdd method that will only add an item when the key is not present:

    var itemsProcess = new ConcurrentDictionary<string, Item>();
    Parallel.ForEach(items, item =>
    {
        // Returns existing item with same ID or adds this item
        var itemProcess = itemsProcess.GetOrAdd(item.Id, item);
        if (!object.ReferenceEquals(item, itemProcess))
        {
            Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
        }
        else
        {
            Console.WriteLine($"Processing item [{item.Name}]");
            // do some work...
        }
    });
    

    If you then need the processed Items as an ICollection, they can be accessed via itemsProcess.Values.