Search code examples
c#producer-consumermultitasking

Avoiding duplicate data when using multiple tasks


Im trying to run a producer consumer pattern on my code to make it faster. My process is that I want to use multiple tasks running at the same time, for getting data, wrapping it in a custom class Product and then adding the product to a queue. Then the consumer saves it in the DB through Entity Framework through a single task. I tested the code I have now by inserting around 1000 products in the database, and then running sql query to check for duplicate rows. SQL Query result

As you can see on the picture around 30 products appear more then once in the database.

This is my code:

public static void GetAllProductsFromIndexes_AndPutInDB(List<IndexModel> indexes, ProductContext context)
{
    BlockingCollection<IndexModel> inputQueue = CreateInputQueue(indexes);
    BlockingCollection<Product> productsQueue = new BlockingCollection<Product>(500);

    var consumer = Task.Run(() =>
    {
        foreach (Product readyProduct in productsQueue.GetConsumingEnumerable())
        {
            InsertProductInDB(readyProduct, context);
        }
    });

    var producers = Enumerable.Range(0, 25)
        .Select(_ => Task.Run(() =>
        {
            foreach (IndexModel index in inputQueue.GetConsumingEnumerable())
            {
                Product product = new Product();
                byte[] unconvertedByteArray;
                string xml;
                string url = @"https://data.Icecat.biz/export/freexml.int/en/";

                unconvertedByteArray = DownloadIcecatFile(index.IndexNumber.ToString() + ".xml", url);
                xml = Encoding.UTF8.GetString(unconvertedByteArray);
                XmlDocument xmlDoc = new XmlDocument();
                xmlDoc.LoadXml(xml);

                GetProductDetails(product, xmlDoc, index);

                XmlNodeList nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductFeature"));
                product.FeaturesLink = GetProductFeatures(product, nodeList);

                nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductGallery/ProductPicture"));
                product.Images = GetProductImages(nodeList);
                productsQueue.Add(product);
            }
        })).ToArray();

    Task.WaitAll(producers);

    productsQueue.CompleteAdding();

    consumer.Wait();
}

All in all my question is what can I do to avoid this happening?


Solution

  • To avoid duplication try adding new field to Product table and lets name it Code which contains the hash of the product`s xml. And make sure to add unique attribute to it. this way any attempt to add a duplication will fail due to the unique constraint/index.

    public static string Hash(string input)
    {
        var hash = new SHA1Managed().ComputeHash(Encoding.UTF8.GetBytes(input));
        return string.Concat(hash.Select(b => b.ToString("x2")));
    }
    
    public static void GetAllProductsFromIndexes_AndPutInDB(List<IndexModel> indexes, ProductContext context)
    {
        BlockingCollection<IndexModel> inputQueue = CreateInputQueue(indexes);
        BlockingCollection<Product> productsQueue = new BlockingCollection<Product>(500);
    
        var consumer = Task.Run(() =>
        {
            foreach (Product readyProduct in productsQueue.GetConsumingEnumerable())
            {
                InsertProductInDB(readyProduct, context);
            }
        });
    
        var producers = Enumerable.Range(0, 25)
            .Select(_ => Task.Run(() =>
            {
                foreach (IndexModel index in inputQueue.GetConsumingEnumerable())
                {
                    Product product = new Product();
                    byte[] unconvertedByteArray;
                    string xml;
                    string url = @"https://data.Icecat.biz/export/freexml.int/en/";
    
                    unconvertedByteArray = DownloadIcecatFile(index.IndexNumber.ToString() + ".xml", url);
                    xml = Encoding.UTF8.GetString(unconvertedByteArray);
                    XmlDocument xmlDoc = new XmlDocument();
                    xmlDoc.LoadXml(xml);
    
                    GetProductDetails(product, xmlDoc, index);
    
                    XmlNodeList nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductFeature"));
                    product.FeaturesLink = GetProductFeatures(product, nodeList);
    
                    nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductGallery/ProductPicture"));
                    product.Images = GetProductImages(nodeList);
                    product.Code= Hash(xml);
                    productsQueue.Add(product);
                }
            })).ToArray();
    
        Task.WaitAll(producers);
    
        productsQueue.CompleteAdding();
    
        consumer.Wait();
    }