I have this code that processes items in a list:
static readonly object _Lock = new object();
public class Item
{
public string Name;
public string ID;
}
static void Main(string[] args)
{
var items = new List<Item>
{
new Item { Name = "One", ID = "123" },
new Item { Name = "Two", ID = "234" },
new Item { Name = "Three", ID = "123" }
};
var itemsProcess = new ConcurrentBag<Item>();
Parallel.ForEach(items, (item) =>
{
Item itemProcess = null;
// lock (_Lock)
{
itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
}
if (itemProcess != null)
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
}
else
{
itemsProcess.Add(item);
Console.WriteLine($"Processing item [{item.Name}]");
Thread.Sleep(1000); // do some work...
}
});
Console.ReadKey();
}
I am basically using the ConcurrentBag
to check for the existence of an object based on several conditions.
I am expecting to always get an output like (order may vary):
Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]
But I sometimes get an output, which suggests my code is not thread safe:
Processing item [Three]
Processing item [One]
Processing item [Two]
So my assumption that itemsProcess.FirstOrDefault()
will block was wrong.
Using the lock
does not change anything. obviously, something is wrong here, and I really can't understand why?
I know I can "solve" this in other ways (one is to prepare the list before entering Parallel.ForEach()
), but I really like to know why is this behavior?
You have 2 independent operations within your parallel loop: FirstOrDefault
and Add
.
ConcurrentBag
cannot ensure thread-safety between these 2 operations.
An alternative would be ConcurrentDictionary
, which has a GetOrAdd
method that will only add an item when the key is not present:
var itemsProcess = new ConcurrentDictionary<string, Item>();
Parallel.ForEach(items, item =>
{
// Returns existing item with same ID or adds this item
var itemProcess = itemsProcess.GetOrAdd(item.Id, item);
if (!object.ReferenceEquals(item, itemProcess))
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
}
else
{
Console.WriteLine($"Processing item [{item.Name}]");
// do some work...
}
});
If you then need the processed Items as an ICollection
, they can be accessed via itemsProcess.Values
.