Search code examples
concurrencyc#-4.0plinqpfx

While using ConcurrentQueue, trying to dequeue while looping through in parallel


I am using the parallel data structures in my .NET 4 application and I have a ConcurrentQueue that gets added to while I am processing through it.

I want to do something like:

personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ... );

as I make database calls to save the data, so I am limiting the number of concurrent threads.

But, I expect that the ForAll isn't going to dequeue, and I am concerned about just doing

ForAll(i => {
    personqueue.personqueue.TryDequeue(...);
    ...
});

as there is no guarantee that I am popping off the correct one.

So, how can I iterate through the collection and dequeue, in a parallel fashion.

Or, would it be better to use PLINQ to do this processing, in parallel?


Solution

  • Well I'm not 100% sure what you try to archive here. Are you trying to just dequeue all items until nothing is left? Or just dequeue lots of items in one go?

    The first probably unexpected behavior starts with this statement:

     theQueue.AsParallel()
    

    For a ConcurrentQueue, you get a 'Snapshot'-Enumerator. So when you iterate over a concurrent stack, you only iterate over the snapshot, no the 'live' queue.

    In general I think it's not a good idea to iterate over something you're changing during the iteration.

    So another solution would look like this:

            // this way it's more clear, that we only deque for theQueue.Count items
            // However after this, the queue is probably not empty
            // or maybe the queue is also empty earlier   
            Parallel.For(0, theQueue.Count,
                         new ParallelOptions() {MaxDegreeOfParallelism = 20},
                         () => { 
                             theQueue.TryDequeue(); //and stuff
                         });
    

    This avoids manipulation something while iterating over it. However, after that statement, the queue can still contain data, which was added during the for-loop.

    To get the queue empty for moment in time you probably need a little more work. Here's an really ugly solution. While the queue has still items, create new tasks. Each task start do dequeue from the queue as long as it can. At the end, we wait for all tasks to end. To limit the parallelism, we never create more than 20-tasks.

            // Probably a kitty died because of this ugly code ;)
            // However, this code tries to get the queue empty in a very aggressive way
            Action consumeFromQueue = () =>
                                          {
                                              while (tt.TryDequeue())
                                              {
                                                  ; // do your stuff
                                              }
                                          };
            var allRunningTasks = new Task[MaxParallism];
            for(int i=0;i<MaxParallism && tt.Count>0;i++)
            {
                allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
            }
            Task.WaitAll(allRunningTasks);