I have to write a program where I'm reading from a database the queues to process and all the queues are run in parallel and managed on the parent thread using a ConcurrentDictionary. I have a class that represents the queue, which has a constructor that takes in the queue information and the parent instance handle. The queue class also has the method that processes the queue.
Here is the Queue Class:
Class MyQueue {
protected ServiceExecution _parent;
protect string _queueID;
public MyQueue(ServiceExecution parentThread, string queueID)
{
_parent = parentThread;
_queueID = queueID;
}
public void Process()
{
try
{
//Do work to process
}
catch()
{
//exception handling
}
finally{
_parent.ThreadFinish(_queueID);
}
The parent thread loops through the dataset of queues and instantiates a new queue class. It spawns a new thread to execute the Process method of the Queue object asynchronously. This thread is added to the ConcurrentDictionary and then started as follows:
private ConcurrentDictionary<string, MyQueue> _runningQueues = new ConcurrentDictionary<string, MyQueue>();
Foreach(datarow dr in QueueDataset.rows)
{
MyQueue queue = new MyQueue(this, dr["QueueID"].ToString());
Thread t = new Thread(()=>queue.Process());
if(_runningQueues.TryAdd(dr["QueueID"].ToString(), queue)
{
t.start();
}
}
//Method that gets called by the queue thread when it finishes
public void ThreadFinish(string queueID)
{
MyQueue queue;
_runningQueues.TryRemove(queueID, out queue);
}
I have a feeling this is not the right approach to manage the asynchronous queue processing and I'm wondering if perhaps I can run into deadlocks with this design? Furthermore, I would like to use Tasks to run the queues asynchronously instead of the new Threads. I need to keep track of the queues because I will not spawn a new thread or task for the same queue if the previous run is not complete yet. What is the best way to handle this type of parallelism?
Thanks in advance!
Indeed it is not the right approach. High number of queues read from database will spawn high number of threads which might be bad. You will create a new thread each time. Better to create some threads and then re-use them. And if you want tasks, better to create LongRunning
tasks and re-use them.
I'd suggest the following design:
LongRunning
tasks to read a queue each from that BlockingCollection and process that queue;They will at least not happen at the application side. However, since the queues are of database transactions, the deadlock may happen at the database end. You may have to write some logic to make your task start a transaction again if the database rolled it back because of deadlock.
private static void TaskDesignedRun()
{
var expectedParallelQueues = 1024; //Optimize it. I've chosen it randomly
var parallelProcessingTaskCount = 4 * Environment.ProcessorCount; //Optimize this too.
var baseProcessorTaskArray = new Task[parallelProcessingTaskCount];
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var itemsToProcess = new BlockingCollection<MyQueue>(expectedParallelQueues);
//Start a new task to populate the "itemsToProcess"
taskFactory.StartNew(() =>
{
// Add code to read queues and add them to itemsToProcess
Console.WriteLine("Done reading all the queues...");
// Finally signal that you are done by saying..
itemsToProcess.CompleteAdding();
});
//Initializing the base tasks
for (var index = 0; index < baseProcessorTaskArray.Length; index++)
{
baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
{
while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0) {
MyQueue q;
if (!itemsToProcess.TryTake(out q)) continue;
//Process your queue
}
});
}
//Now just wait till all queues in your database have been read and processed.
Task.WaitAll(baseProcessorTaskArray);
}