Search code examples
c#asynchronoustask-parallel-libraryconcurrent-collections

How to use tasks with ConcurrentDictionary


I have to write a program where I'm reading from a database the queues to process and all the queues are run in parallel and managed on the parent thread using a ConcurrentDictionary. I have a class that represents the queue, which has a constructor that takes in the queue information and the parent instance handle. The queue class also has the method that processes the queue.

Here is the Queue Class:

Class MyQueue { 
protected ServiceExecution _parent;
protect string _queueID;

public MyQueue(ServiceExecution parentThread, string queueID)
{
_parent = parentThread;
_queueID = queueID;
}
public void Process()
{
    try
    {
       //Do work to process
    }
    catch()
    {
       //exception handling
    }
    finally{
       _parent.ThreadFinish(_queueID);
    }

The parent thread loops through the dataset of queues and instantiates a new queue class. It spawns a new thread to execute the Process method of the Queue object asynchronously. This thread is added to the ConcurrentDictionary and then started as follows:

private ConcurrentDictionary<string, MyQueue> _runningQueues = new ConcurrentDictionary<string, MyQueue>();

Foreach(datarow dr in QueueDataset.rows)
{
   MyQueue queue = new MyQueue(this, dr["QueueID"].ToString());
   Thread t = new Thread(()=>queue.Process());
   if(_runningQueues.TryAdd(dr["QueueID"].ToString(), queue)
   {
       t.start();
   }
}

//Method that gets called by the queue thread when it finishes
public void ThreadFinish(string queueID)
{
    MyQueue queue;
    _runningQueues.TryRemove(queueID, out queue);
}

I have a feeling this is not the right approach to manage the asynchronous queue processing and I'm wondering if perhaps I can run into deadlocks with this design? Furthermore, I would like to use Tasks to run the queues asynchronously instead of the new Threads. I need to keep track of the queues because I will not spawn a new thread or task for the same queue if the previous run is not complete yet. What is the best way to handle this type of parallelism?

Thanks in advance!


Solution

  • About your current approach

    Indeed it is not the right approach. High number of queues read from database will spawn high number of threads which might be bad. You will create a new thread each time. Better to create some threads and then re-use them. And if you want tasks, better to create LongRunning tasks and re-use them.


    Suggested Design

    I'd suggest the following design:

    1. Reserve only one task to read queues from the database and put those queues in a BlockingCollection;
    2. Now start multiple LongRunning tasks to read a queue each from that BlockingCollection and process that queue;
    3. When a task is done with processing the queue it took from the BlockingCollection, it will then take another queue from that BlockingCollection;
    4. Optimize the number of these processing tasks so as to properly utilize the cores of your CPU. Usually since DB interactions are slow, you can create tasks 3 times more than the number of cores however YMMV.

    Deadlock possibility

    They will at least not happen at the application side. However, since the queues are of database transactions, the deadlock may happen at the database end. You may have to write some logic to make your task start a transaction again if the database rolled it back because of deadlock.


    Sample Code

    private static void TaskDesignedRun()
    {
        var expectedParallelQueues = 1024; //Optimize it. I've chosen it randomly
        var parallelProcessingTaskCount = 4 * Environment.ProcessorCount; //Optimize this too.
        var baseProcessorTaskArray = new Task[parallelProcessingTaskCount];
        var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
    
        var itemsToProcess = new BlockingCollection<MyQueue>(expectedParallelQueues);
    
        //Start a new task to populate the "itemsToProcess"
        taskFactory.StartNew(() =>
        {
            // Add code to read queues and add them to itemsToProcess
            Console.WriteLine("Done reading all the queues...");
            // Finally signal that you are done by saying..
            itemsToProcess.CompleteAdding();
        });
    
        //Initializing the base tasks
        for (var index = 0; index < baseProcessorTaskArray.Length; index++)
        {
            baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
            {
                while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0)           {
                    MyQueue q;
                    if (!itemsToProcess.TryTake(out q)) continue;
                    //Process your queue
                }
             });
         }
    
         //Now just wait till all queues in your database have been read and processed.
         Task.WaitAll(baseProcessorTaskArray);
    }