Search code examples
linqc#-4.0parallel.foreach

Linq In Parallel.ForEach


I have a foreach loop with Linq queries inside.

Everything runs great until I change the foreach to Parallel.ForEach:

// get the task info   ---------
                Log("Populate task, guf code lists ...........................");
                List<SF_CO_ITEM> tasks = (from coi in ctx.SF_CO_ITEM
                                                    where coi.CO == co.ID
                                                    select coi).ToList();

               // foreach (SF_CO_ITEM t in tasks)
               // {
                Parallel.ForEach(tasks, t =>
                {
                    Log("Executing on t: " + t.ID);

                    // exception on next line:
                    List<SF_CO_LINE_ITEM> gufs = (from coli in ctx.SF_CO_LINE_ITEM      
                                                            where coli.CO_ITEM == t.ID
                                                            select coli).ToList();

And the exception I get is:

System.AccessViolationException was unhandled Message=Attempted to read or write protected memory. This is often an indication that other memory is corrupt. Source=Oracle.DataAccess StackTrace: at Oracle.DataAccess.Client.OpsCon.Open(IntPtr& opsConCtx, IntPtr& opsErrCtx, OpoConValCtx* pOpoConValCtx, OpoConRefCtx& pOpoConRefCtx) at Oracle.DataAccess.Client.ConnectionDispenser.Open(OpoConCtx opoConCtx) at Oracle.DataAccess.Client.OracleConnection.Open() at System.Data.EntityClient.EntityConnection.OpenStoreConnectionIf(Boolean openCondition, DbConnection storeConnectionToOpen, DbConnection originalConnection, String exceptionCode, String attemptedOperation, Boolean& closeStoreConnectionOnFailure) at System.Data.EntityClient.EntityConnection.Open() at System.Data.Objects.ObjectContext.EnsureConnection() at System.Data.Objects.ObjectQuery1.GetResults(Nullable1 forMergeOption) at System.Data.Objects.ObjectQuery1.System.Collections.Generic.IEnumerable<T>.GetEnumerator() at System.Collections.Generic.List1..ctor(IEnumerable1 collection) at System.Linq.Enumerable.ToList[TSource](IEnumerable1 source) at ChangeOrder.Program.<>c_DisplayClass19.b_16(SF_CHANGE_ORDER_ITEM t) in C:\VS_apps\PMConsole\PMC Tools\ChangeOrderExecution\Program.cs:line 220 at System.Threading.Tasks.Parallel.<>c_DisplayClass2d2.<ForEachWorker>b__23(Int32 i) at System.Threading.Tasks.Parallel.<>c__DisplayClassf1.b_c() at System.Threading.Tasks.Task.InnerInvoke() at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask) at System.Threading.Tasks.Task.<>c_DisplayClass7.b_6(Object ) at System.Threading.Tasks.Task.ExecuteSelfReplicating(Task root) at System.Threading.Tasks.Task.Execute() at System.Threading.Tasks.Task.ExecutionContextCallback(Object obj) at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx) at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot) at System.Threading.Tasks.Task.ExecuteEntry(Boolean bPreventDoubleExecution) at System.Threading.Tasks.ThreadPoolTaskScheduler.TryExecuteTaskInline(Task task, Boolean taskWasPreviouslyQueued) at System.Threading.Tasks.TaskScheduler.TryRunInline(Task task, Boolean taskWasPreviouslyQueued, Object threadStatics) at System.Threading.Tasks.Task.InternalRunSynchronously(TaskScheduler scheduler) at System.Threading.Tasks.Task.RunSynchronously(TaskScheduler scheduler) at System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action1 body, Action2 bodyWithState, Func4 bodyWithLocal, Func1 localInit, Action1 localFinally) at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IList1 list, ParallelOptions parallelOptions, Action1 body, Action2 bodyWithState, Action3 bodyWithStateAndIndex, Func4 bodyWithStateAndLocal, Func5 bodyWithEverything, Func1 localInit, Action1 localFinally) at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IEnumerable1 source, ParallelOptions parallelOptions, Action1 body, Action2 bodyWithState, Action3 bodyWithStateAndIndex, Func4 bodyWithStateAndLocal, Func5 bodyWithEverything, Func1 localInit, Action1 localFinally) at System.Threading.Tasks.Parallel.ForEach[TSource](IEnumerable1 source, Action`1 body) at ChangeOrder.Program.PerformChangeOrder(SF_CHANGE_ORDER co, SF_CLIENT_PROJECT cp, SFEntitiesQA ctx) in C:\VS_apps\PMConsole\PMC Tools\ChangeOrderExecution\Program.cs:line 216 at ChangeOrder.Program.Main(String[] args) in C:\VS_apps\PMConsole\PMC Tools\ChangeOrderExecution\Program.cs:line 1373 at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args) at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args) at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly() at System.Threading.ThreadHelper.ThreadStart_Context(Object state) at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx) at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state) at System.Threading.ThreadHelper.ThreadStart() InnerException:

I'm not sure what I would need to lock since I am just grabbing information (just RO, right?).

I've thought about adding the "AsParallel" but my understanding is that's a PLINQ directive that just causes the query to run in parallel with respect to itself.

I can't find any examples of people running Linq queries inside Parallel.ForEach loops, so I'm not even sure what I'm doing is allowed.


Solution

  • Why not just do the join itself instead of getting each one individually? Since it looks like this is hitting a DB, your LINQ provider should just compose the query and get you the rows you're looking for. Try this:

    List<SF_CO_LINE_ITEM> gufs;
    var query = from coi in ctx.SF_CO_ITEM
                where coi.CO == co.ID
                join coli in ctx.SF_CO_LINE_ITEM      
                    on coi.ID == coli.CO_ITEM
                select coli;
    // Confirm what the query looks like by calling 'query.ToString()'
    gufs = query.ToList();
    

    I'll usually separate the query from the actual enumeration/materialization, that way I can validate that the query looks like I want. If this is a 1:M relationship between SF_CO_ITEM and SF_CO_LINE_ITEM, then you should do a GroupJoin by changing the join to:

    join coli in ctx.SF_CO_LINE_ITEM      
        on coi.ID == coli.CO_ITEM into tcoli
    from tc in tcoli
    select tc
    

    As for why you're getting this exception, might have something to do with trying to access the context from a different thread. As per MSDN article on Parallel loops:

    Hidden Loop Body Dependencies

    Incorrect analysis of loop dependencies is a frequent source of software defects. Be careful that all parallel loop bodies do not contain hidden dependencies. This is a mistake that's easy to make.

    The case of trying to share an instance of a class such as Random or DbConnection, which are not thread safe, across parallel iterations is an example of a subtle dependency.

    So your only options are to keep it sequential instead of parallel or to alter the original query to a join so that you get the right data the first time around.

    Hope that helps!