Search code examples
c#multithreadingasynchronousconcurrencyconcurrentdictionary

Concurrent Dictionary reading and removing items In parallel


Question background:

I am currently learning about concurrent operations using the.NET 6 Parallel.ForEachAsync loop.

I have created the following program which features two Tasks running parallel to each other in a Task.WhenAll function. The KeepAliveAsync method runs up to 3 degrees of parallelism on the HttpClient calls for a total list of ten items. The DisposeAsync method will remove these items from the concurrent dictionary but before this, the CleanItemBeforeDisposal method removes the property values of the Item object.

Code:

{
    [TestClass]
    public class DisposeTests
    {
        private ConcurrentDictionary<int, Item> items = new ConcurrentDictionary<int, Item>();

        private bool keepAlive = true;

        [TestMethod]
        public async Task Test()
        {
            //Arrange
            string uri = "https://website.com";

            IEnumerable<int> itemsToAdd = Enumerable.Range(1, 10);
            IEnumerable<int> itemsToDispose = Enumerable.Range(1, 10);

            foreach (var itemToAdd in itemsToAdd)
            {
                items.TryAdd(itemToAdd, new Item { Uri = uri });
            }

            //Act
            await Task.WhenAll(KeepAliveAsync(), DisposeAsync(itemsToDispose));

            //Assert
            Assert.IsTrue(items.Count == 0);
        }

        private async Task KeepAliveAsync()
        {
            HttpClient httpClient = new HttpClient();

            do
            {
                ParallelOptions parallelOptions = new()
                {
                    MaxDegreeOfParallelism = 3,
                };

                await Parallel.ForEachAsync(items.ToArray(), parallelOptions, async (item, token) =>
                {
                    var response = await httpClient.GetStringAsync(item.Value.Uri);

                    item.Value.DataResponse = response;

                    item.Value.DataResponse.ToUpper();
                });

            } while (keepAlive == true);
        }

        private async Task DisposeAsync(IEnumerable<int> itemsToRemove)
        {
            var itemsToDisposeFiltered = items.ToList().FindAll(a => itemsToRemove.Contains(a.Key));

            ParallelOptions parallelOptions = new()
            {
                MaxDegreeOfParallelism = 3,
            };

            await Parallel.ForEachAsync(itemsToDisposeFiltered.ToArray(), parallelOptions, async (itemsToDispose, token) =>
            {
                await Task.Delay(500);

                CleanItemBeforeDisposal(itemsToDispose);

                bool removed = items.TryRemove(itemsToDispose);

                if (removed == true)
                {
                    Debug.WriteLine($"DisposeAsync - Removed item {itemsToDispose.Key} from the list");
                }
                else
                {
                    Debug.WriteLine($"DisposeAsync - Did not remove item {itemsToDispose.Key} from the list");
                }
            });

            keepAlive = false;
        }

        private void CleanItemBeforeDisposal(KeyValuePair<int, Item> itemToDispose)
        {
            itemToDispose.Value.Uri = null;
            itemToDispose.Value.DataResponse = null;
        }
    }
}

The Issue:

The code runs but I am running into an issue where the Uri property of the Item object is being set null from the CleanItemBeforeDisposal method as called from the Dispose method by design but then the HttpClient call is being made in the parallel KeepAliveAsync method at which point the shared Item object is null and errors with:

System.InvalidOperationException: An invalid request URI was provided. Either the request URI must be an absolute URI or BaseAddress must be set.

I have used the ToArray method on the shared ConcurrentDictionary as I believe this will create a snapshot of the dictionary at the time it is called but obviously this wont solve this race condition.

What is the correct way to go about handling a situation where two processes are accessing one shared list where there is the possibility one process has changed properties of an entity of that list which the other process requires?


Solution

  • I'll try to answer the question directly without getting into details about the design, etc.

    ConcurrentDictionary is thread safe, meaning that multiple threads can safely add and remove items from the dictionary. That thread safety does not apply at all to whatever objects are stored as values in the dictionary.

    If multiple threads have a reference to an instance of Item and are updating its properties, all sorts of unpredictable things can happen.

    To directly answer the question:

    What is the correct way to go about handling a situation where two processes are accessing one shared list where there is the possibility one process has changed properties of an entity of that list which the other process requires?

    There is no correct way to handle that possibility. If you want the code to work in a predictable way you must eliminate that possibility.

    It looks like you might have hoped that somehow the two operations will stay in sync. They won't. Even if they did, just once, it might never happen again. It's unpredictable.

    If you actually need to set the Uri and Response to null, it's probably better to do that to each Item in the same thread right after you're done using those values. If you do those three things

    • Execute the request for a single Item
    • Do something with the values
    • Set them to null

    ...one after another in the same thread, it's impossible for them to happen out of order.

    (But do you need to set them to null at all? It's not clear what that's for. If you just didn't do it, then there wouldn't be a problem to solve.)