I have tasks running that call a method that reads from RabbitMQ. When there is nothing in the queue, the method simply blocks. So the tasks have a "running" status, but aren't actually doing anything. Is there any way to gracefully end these tasks?
The code that accesses the queue is as follows:
private void FindWork(CancellationToken ct)
{
if (ct.IsCancellationRequested)
return;
bool result = false;
bool process = false;
bool queueResult = false;
Work_Work work = null;
try
{
using (Queue workQueue = new Queue(_workQueue))
{
// Look for work on the work queue
workQueue.Open(Queue.Mode.Consume);
work = workQueue.ConsumeWithBlocking<Work_Work>();
// Do some work with the message ...
return;
The tasks are created as follows:
private void Run()
{
while (!_stop)
{
// Remove and stopped tasks from the pool
List<int> removeThreads = new List<int>();
lock (_tasks)
{
foreach (KeyValuePair<int, Task> task in _tasks)
{
if (task.Value.Status != TaskStatus.Running)
{
task.Value.Wait();
removeThreads.Add(task.Value.Id);
}
}
foreach (int taskID in removeThreads)
_tasks.Remove(taskID);
}
CancellationToken ct = _cts.Token;
TaskFactory factory = new TaskFactory(ct, TaskCreationOptions.LongRunning, TaskContinuationOptions.LongRunning, null);
// Create new tasks if we have room in the pool
while (_tasks.Count < _runningMax)
{
Task task = factory.StartNew(() => FindWork(ct));
lock (_tasks)
_tasks.Add(task.Id, task);
}
// Take a rest so we don't run the CPU to death
Thread.Sleep(1000);
}
}
Currently I have changed my task creation code to look like the following so that I can abort the tasks. I know this is not a good solution, but I don't know what else to do.
while (_tasks.Count < _runningMax)
{
Task task = factory.StartNew(() =>
{
try
{
using (_cts.Token.Register(Thread.CurrentThread.Abort))
{
FindWork(ct);
}
}
catch (ThreadAbortException)
{
return;
}
}, _cts.Token);
_tasks.Add(task.Id, task);
}
Could the following work in your scenario?
Instead of spawning multiple threads and having them waiting in the queue, I would have a single thread in an infinite polling loop and having that one spawn a new thread when a new piede of work comes in. You can add a semaphore to limit the number of threads you create. Check sample code below, I've used a BlockingCollection instead of the RabbitMQ .
public class QueueManager
{
public BlockingCollection<Work> blockingCollection = new BlockingCollection<Work>();
private const int _maxRunningTasks = 3;
static SemaphoreSlim _sem = new SemaphoreSlim(_maxRunningTasks);
public void Queue()
{
blockingCollection.Add(new Work());
}
public void Consume()
{
while (true)
{
Work work = blockingCollection.Take();
_sem.Wait();
Task t = Task.Factory.StartNew(work.DoWork);
}
}
public class Work
{
public void DoWork()
{
Thread.Sleep(5000);
_sem.Release();
Console.WriteLine("Finished work");
}
}
}
and my testing class
class Test
{
static void Main(string[] args)
{
Consumer c = new Consumer();
Task t = Task.Factory.StartNew(c.Consume);
c.Queue();
c.Queue();
c.Queue();
c.Queue();
c.Queue();
Thread.Sleep(1000);
Console.ReadLine();
}
}