Search code examples
c#task-parallel-libraryblockingcollection

Blocking Collection shows Duplicate entries


I first retrieve Total number of rows in my table (say 100) and then divide them into chunks (say 25). Then I create a task (taskFetch) which fetches rows from MyTable in chunks into DataTable (each containing 25 records) using Parallel.Foreach() method. There's another nested Parallel,Foreach() which uses Partitioner.Create() which retrieves data from each DataTable and Adds it into Blocking Collection (sourceCollection). For testing purpose I output the result on console and it was fine.

But, as I tried to retrieve data from sourceCollection I found duplicate entries. In real I have over 1500,000 records in table. I dont exactly think that duplicate entries are Add-ed but the way I'm Take-ing is something I'm a bit doubtful about.

Code

    public async Task BulkMigrationAsync(string clearPVK, string EncZPK)
    {
        BlockingCollection<MigrationObject> sourceCollection = new BlockingCollection<MigrationObject>();
        List<Task> tasksGeneratedPinBlock = new List<Task>();
        int rangeFrom = 1;
        int recordsInSet = 25;
        int rangeTo = 25;
        int setsCount = 0;
        Dictionary<int, Tuple<int, int>> chunks = new Dictionary<int, Tuple<int, int>>();

        SqlConnection conn = new SqlConnection(_Database.ConnectionString);
        SqlCommand cmd = new SqlCommand("SELECT COUNT(*) FROM MyTable");   // Suppose If retrieves 100 rows

                // getting total records from MyTable
        using (conn)
        {
            conn.Open();
            setsCount = Convert.ToInt32(cmd.ExecuteScalar()) / recordsInSet ;   // then setsCount will be 4
            conn.Close();
        }

        for (int i = 0; i < setsCount; i++)
        {
            chunks.Add(i, new Tuple<int, int>(rangeFrom, rangeTo));
            rangeFrom = rangeTo + 1;
            rangeTo = rangeTo + recordsInSet;
        }
                // Each chunk would contain 100000 records to be preocessed later
                // chunks => {0,  (1, 25)}         
                //           {1,  (26, 50)}   // a chunk, chunk.Value.Item1 = 26 and chunk.Value.Item2 = 50
                //           {2, (51, 75)}
                //           {3, (76, 100)}
                           
                // fetching results in dataTable from DB in chunks and ADDING to sourceCollection                 

        Task taskFetch = Task.Factory.StartNew(() =>
        {
            Parallel.ForEach(chunks, (chunk) =>
            {
                DataTable dt = new DataTable();
                SqlConnection localConn = new SqlConnection(_Database.ConnectionString);
                string command = @"SELECT * FROM ( SELECT RELATIONSHIP_NUM, CUST_ID, CODE, BLOCK_NEW, ROW_NUMBER() over (
                                   order by RELATIONSHIP_NUM, CUST_ID) as RowNum   FROM MyTable) SUB
                                   WHERE SUB.RowNum BETWEEN chunk.Value.Item1 AND chunk.Value.Item2";

                SqlDataAdapter da = new SqlDataAdapter(command, localConn);

                try
                {
                    using (da)
                    using (localConn)
                    {
                        da.Fill(dt);
                    }
                }
                finally
                {
                    if (localConn.State != ConnectionState.Closed)
                        localConn.Close();
                    localConn.Dispose();
                }

                Parallel.ForEach(Partitioner.Create(0, dt.Rows.Count),
                    (range, state) =>
                    {
                        MigrationObject migSource = new MigrationObject();

                        for (int i = range.Item1; i < range.Item2; i++)
                        {
                            migSource.PAN = dt.Rows[i]["CUST_ID"].ToString();
                            migSource.PinOffset = dt.Rows[i]["CODE"].ToString();
                            migSource.PinBlockNew = dt.Rows[i]["BLOCK_NEW"].ToString();
                            migSource.RelationshipNum = dt.Rows[i]["RELATIONSHIP_NUM"].ToString();

                            Console.WriteLine(@"PAN " + migSource.PAN + " Rel " + migSource.RelationshipNum
                                                 + " for ranges : " + range.Item1 + " TO " + range.Item2);

                            sourceCollection.TryAdd(migSource);
                        }
                    });
            });
        });

        await taskFetch;
        sourceCollection.CompleteAdding();

        while (!sourceCollection.IsCompleted)
        {
            MigrationObject mig;
            if (sourceCollection.TryTake(out mig))    // Seems to be the problem area because may be im not handling out 
            {
                await Task.Delay(50);
                Console.WriteLine("     Rel " + mig.RelationshipNum + "  PAN " + mig.PAN);
            }
        }
    }

Solution

  • My bad, Actually the problem area is :

                    Parallel.ForEach(Partitioner.Create(0, dt.Rows.Count),
                    (range, state) =>
                    {
                        MigrationObject migSource = new MigrationObject();  // creating the object outside For loop.
    
                        for (int i = range.Item1; i < range.Item2; i++)
                        {
                            migSource.PAN = dt.Rows[i]["CUST_ID"].ToString();
                            migSource.PinOffset = dt.Rows[i]["CODE"].ToString();
                            migSource.PinBlockNew = dt.Rows[i]["BLOCK_NEW"].ToString();
                            migSource.RelationshipNum = dt.Rows[i]["RELATIONSHIP_NUM"].ToString();
    
                            Console.WriteLine(@"PAN " + migSource.PAN + " Rel " + migSource.RelationshipNum
                                                 + " for ranges : " + range.Item1 + " TO " + range.Item2);
    
                            sourceCollection.TryAdd(migSource);
                        }
                    });
    

    instead I should have included ' MigrationObject migSource = new MigrationObject();' inside For loop :

                    Parallel.ForEach(Partitioner.Create(0, dt.Rows.Count),
                (range, state) =>
                {
                    for (int i = range.Item1; i < range.Item2; i++)
                    {
                        MigrationObject migSource = new MigrationObject(); 
    
                        migSource.PAN = dt.Rows[i]["CUST_ID"].ToString();
                        migSource.PinOffset = dt.Rows[i]["CODE"].ToString();
                        migSource.PinBlockNew = dt.Rows[i]["BLOCK_NEW"].ToString();
                        migSource.RelationshipNum = dt.Rows[i]["RELATIONSHIP_NUM"].ToString();
    
                        Console.WriteLine(@"PAN " + migSource.PAN + " Rel " + migSource.RelationshipNum
                                             + " for ranges : " + range.Item1 + " TO " + range.Item2);
    
                        sourceCollection.TryAdd(migSource);
                    }
                });