I have an application that is hitting an API. As such, it does a query for all ID's in the object, and then has to query each item one at a time per ID. I'm doing this in a Parallel.For loop and adding each items data to a row in a datatable. Then I use sqlbulkcopy to send the datatable to a SQL server table.
If I do this without using Parallel.For, it works great. However, with Parallel.For, this line:
workrow["id"] = Guid.NewGuid();
is generating duplicate Guids. It's doing it often and causing the data to not load into the SQL server table because the id row in SQL is a Primary Key and doesn't allow duplicates. I tried locking:
lock (lockobject)
{
workrow["id"] = Guid.NewGuid();
}
This didn't help.
I tried not assigning an id to that field in hopes that SQL would generate it (it does have newid() on that field). That fails saying it can't insert a null.
I can't seem to just remove the id field from the datatable because then the columns don't align when I do the sqlbulkcopy.
Can someone help me here? I either need to figure out how to get Guid.NewGuid() to STOP producing duplicates OR I need to figure out a way to not pass in the id (always the first field in the datatable) so that SQL will generate the id.
Here is the code I use to generate one of tables:
public static DataTable MakeWorkflowTable()
{
DataTable Workflow = new DataTable("Workflow");
DataColumn id = new DataColumn("id", System.Type.GetType("System.Guid"));
Workflow.Columns.Add(id);
DataColumn OrgInfoID = new DataColumn("OrgInfoID", System.Type.GetType("System.Guid"));
Workflow.Columns.Add(OrgInfoID);
DataColumn Name = new DataColumn("Name", System.Type.GetType("System.String"));
Workflow.Columns.Add(Name);
DataColumn Active = new DataColumn("Active", System.Type.GetType("System.String"));
Workflow.Columns.Add(Active);
DataColumn Description = new DataColumn("Description", System.Type.GetType("System.String"));
Workflow.Columns.Add(Description);
DataColumn Object = new DataColumn("Object", System.Type.GetType("System.String"));
Workflow.Columns.Add(Object);
DataColumn Formula = new DataColumn("Formula", System.Type.GetType("System.String"));
Workflow.Columns.Add(Formula);
DataColumn ManageableState = new DataColumn("ManageableState", System.Type.GetType("System.String"));
Workflow.Columns.Add(ManageableState);
DataColumn NameSpacePrefix = new DataColumn("NameSpacePrefix", System.Type.GetType("System.String"));
Workflow.Columns.Add(NameSpacePrefix);
DataColumn TDACount = new DataColumn("TDACount", System.Type.GetType("System.Int32"));
Workflow.Columns.Add(TDACount);
DataColumn TriggerType = new DataColumn("TriggerType", System.Type.GetType("System.String"));
Workflow.Columns.Add(TriggerType);
DataColumn CreatedDate = new DataColumn("CreatedDate", System.Type.GetType("System.DateTime"));
Workflow.Columns.Add(CreatedDate);
DataColumn CreatedBy = new DataColumn("CreatedBy", System.Type.GetType("System.String"));
Workflow.Columns.Add(CreatedBy);
DataColumn LastModifiedDate = new DataColumn("LastModifiedDate", System.Type.GetType("System.DateTime"));
Workflow.Columns.Add(LastModifiedDate);
DataColumn LastModifiedBy = new DataColumn("LastModifiedBy", System.Type.GetType("System.String"));
Workflow.Columns.Add(LastModifiedBy);
return Workflow;
}
Here is the code I use to send it to the SQL server:
public static void SendDTtoDB(ref DataTable dt, ref SqlConnection cnn, string TableName)
{
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(cnn))
{
bulkCopy.DestinationTableName =
TableName;
try
{
bulkCopy.WriteToServer(dt);
dt.Clear();
}
catch (Exception e)
{
logger.Warn("SendDTtoDB {TableName}: ORGID: {ORGID} : {Message}", TableName, dt.Rows[0]["OrgInfoID"], e.Message.ToString());
if (e.Message.ToString().Contains("PRIMARY KEY"))
{
foreach(DataRow row in dt.Rows)
{
logger.Warn("ID: {id}", row["id"]);
}
}
}
}
}
As you can see in the catch statement, I set it to write out the ID's to the log so I could see them for myself and, sure enough, there is a duplicate there. So frustrating! I really don't want to take out the Parallel.For and single thread it if I don't have to.
Per request, here is the code with the Parallel.For
if (qr.totalSize > 0)
{
object lockobject = new object();
Parallel.For(0, qr.records.Length, i =>
{
ToolingService.CustomTab1 vr = new ToolingService.CustomTab1();
vr = (ToolingService.CustomTab1)qr.records[i];
string mdSOQL = "Select FullName, description, ManageableState, MasterLabel, NamespacePrefix, Type, Url, CreatedDate, CreatedBy.Name, "
+ "LastModifiedDate, LastModifiedBy.Name From CustomTab where id='" + vr.Id + "'";
ToolingService.QueryResult mdqr = new ToolingService.QueryResult();
ToolingService.CustomTab1 vrmd = new ToolingService.CustomTab1();
mdqr = ts.query(mdSOQL);
vrmd = (ToolingService.CustomTab1)mdqr.records[0];
DataRow workrow = CustomTabs.NewRow();
lock (lockobject)
{
workrow["id"] = Guid.NewGuid();
}
workrow["OrgInfoID"] = _orgDBID;
workrow["FullName"] = vrmd.FullName;
workrow["Description"] = vrmd.Description ?? Convert.DBNull;
workrow["ManageableState"] = vrmd.ManageableState;
workrow["MasterLabel"] = vrmd.MasterLabel ?? Convert.DBNull;
workrow["NameSpacePrefix"] = vrmd.NamespacePrefix ?? Convert.DBNull;
workrow["Type"] = vrmd.Type ?? Convert.DBNull;
workrow["URL"] = vrmd.Url ?? Convert.DBNull;
workrow["CreatedDate"] = vrmd.CreatedDate ?? Convert.DBNull;
if (vrmd.CreatedBy == null)
{
workrow["CreatedBy"] = Convert.DBNull;
}
else
{
workrow["CreatedBy"] = vrmd.CreatedBy.Name;
}
workrow["LastModifiedDate"] = vrmd.LastModifiedDate ?? Convert.DBNull;
if (vrmd.LastModifiedBy == null)
{
workrow["LastModifiedBy"] = Convert.DBNull;
}
else
{
workrow["LastModifiedBy"] = vrmd.LastModifiedBy.Name;
}
lock (CustomTabs)
{
CustomTabs.Rows.Add(workrow);
}
});
OrgTables.SendDTtoDB(ref CustomTabs, ref _cnn, "OrgCustomTabs");
The thing is that having to use a lock
inside the Parallel.ForEach
in a DataTable, sort of defeats the purpose of using the Parallel.ForEach
in the first place; however, I am surprised that you don't get exceptions when you call DataRow workrow = CustomTabs.NewRow();
because on my test, I get an index corrupted exception. I had to actually wrap the call to NewRow
inside a lock. Something like this:
Parallel.ForEach(data, x =>
{
DataRow row = null;
lock (lockRow)
{
row = dt.NewRow();
row["Guid"] = Guid.NewGuid();
}
...
lock(lockObj)
dt.Rows.Add(row);
Where lockObj
and lockRow
are 2 separate static objects instantiated as
static object lockObj = new object();
static object lockRow = new object();
And that worked for me, adding 1 million rows to the DataTable
and making sure that all the Guids were unique.
With all the above said, I would strongly recommend writing the code as suggested by Julian or create a class that implements IDataReader
(which you can use with SQLBulkCopy) and upload the data using that.