Search code examples
c#multithreadingblockingcollection

Foreach throwing errors when printing datatable obtained from BlockingCollection in C#


I have hit a stumbling block in my foray into multithreading. I think I know what the problem is but can't determine how to resolve it. But I may be wrong.

In summary I have producer and consumer threads. The producer threads collect data from an external source into datatables, and then put them into a collection. The consumer then grabs the datatables from the collection. I am using BlockingCollection as a public static collection, so that it can be accessible to both threads, which exists in two different classes. I'll show the main parts of the code now then explain what is and isn't working.

Producer Thread:

try
{
     dataTable.Clear();
     adapter.Fill(dataTable);
     dataCaptured = true;
     timeout = 0;
     ThreadInfo.setCurrentDate(startDate);
     ThreadInfo.dataTableCollection.Add(dataTable);
}

Consumer Threads

while(true)
{
     DataTable testTable = ThreadInfo.dataTableCollection.Take();
     foreach (DataRow datarow in testTable.Rows)
     {
          foreach (var item in datarow.ItemArray)
          {
                Console.WriteLine(item);
          }
     }
}

So my tests show that as the producer thread creates datatables it successfully adds them to the collection. I can see this by using count both before and after the add method. Counting the number of rows in each table I can also confirm that the table added is the same table that was created. Furthermore, the take method also successfully removes a table and that table matches those entered. I know this both from counting the number of tables in the collection, and counting the rows in the 'taken' datatable.

My problem is when I try to run the foreach loop to print out the results. Initially it works and starts to print the data to screen, but then throws this error:

System.InvalidOperationException was unhandled
  HResult=-2146233079
  Message=Collection was modified; enumeration operation might not execute.
  Source=System.Data
  StackTrace:
       at System.Data.RBTree`1.RBTreeEnumerator.MoveNext()
       at pullPlexTable.InputThreads.dataConsumerThread() in \\srv-file01\users$\dkb\Visual Studio 2013\Projects\pullPlexTable\pullPlexTable\InputThread.cs:line 39
       at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
       at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
       at System.Threading.ThreadHelper.ThreadStart()
  InnerException:

I believe this seems to be happening at the point that a new datatable is added to the collection. I believe this is supported by the error which shows that .movenext() encountered the error, which means it was attempting to move to the next value in the collection?

But I really don't know what to do or even if i am right. I have tried copying the datatable and that stills encounters and error. I would have thought that once it created a datatable and copied the datatable using the take method from the collection I would be able to iterate over it as I please. My hunch would be that the datatable is still pointing to the datatable in the collection and when that gets bumped down as a new datatable enters it throws the error. But that is complete speculation and probably completely wrong. Can anyone help?

If you need further information I would be happy to post it.


Solution

  • You need to create a new DataTable every time - they are mutable, and passed by reference.

    What you've accomplished so far is make the reference itself thread-safe. But that's not enough - all the threads are still sharing the same instance of the DataTable.

    Instead, the producer might look something like this:

     var dt = new DataTable();
     adapter.Fill(dt);
    
     ThreadInfo.dataTableCollection.Add(dt);
    

    I've removed the other stuff you had around, because it's very likely that was wrong as well - any resources you share accross threads have to be thread-safe or synchronized. The easiest way would be to ensure you can only read and write them inside of a lock, synchronizing access explicitly:

    private static object syncObject = new object();
    
    private static DateTime currentDate;
    public static DateTime CurrentDate
    {
      get { lock (syncObject) return currentDate; }
      set { lock (syncObject) currentDate = value; }
    }
    

    Those are just the very basics. You really don't want to go around guessing in a multi-threaded environment. Multi-threading is hard. At the very least, I'd recommend going through the brilliant http://www.albahari.com/threading/ - it will teach you the basic concepts. And humility :D

    The easiest way to ensure safe multi-threading is making sure you never pass any mutable object between threads - of course, it's very hard to make sure of that with statics; a good guideline is that any public static members must be thread-safe by default. If you're only ever running a new Task with given (immutable or non-shared) arguments, and working with the return value, you make multi-threading a lot easier.

    For a producer-consumer queue, make sure you're passing either immutable data, or something you're not going to reuse (e.g. the "new data table, fill it, pass it, forget it" approach). If you need any signalling not provided by the BlockingCollection itself, make sure it's thread-safe. Ideally, you want to use as high level constructs as possible - CancellationToken, Task, ManualResetEvent etc. - see the link above. It will save you a lot of trouble if you use by-value types (all the way down - wrapping a reference in a struct isn't going to help you, obviously), but even then, you need to lock around both reading and writing.