Search code examples
c#multithreadingtask-parallel-libraryrabbitmqqueueing

What is the best way to cancel a task that is in a blocking state?


I have tasks running that call a method that reads from RabbitMQ. When there is nothing in the queue, the method simply blocks. So the tasks have a "running" status, but aren't actually doing anything. Is there any way to gracefully end these tasks?

The code that accesses the queue is as follows:

 private void FindWork(CancellationToken ct)
    {
        if (ct.IsCancellationRequested)
            return;

        bool result = false;
        bool process = false;
        bool queueResult = false;
        Work_Work work = null;

        try
        {
            using (Queue workQueue = new Queue(_workQueue))
            {
                // Look for work on the work queue
                workQueue.Open(Queue.Mode.Consume);
                work = workQueue.ConsumeWithBlocking<Work_Work>();

                // Do some work with the message ...

                return;

The tasks are created as follows:

private void Run()
    {
        while (!_stop)
        {
            // Remove and stopped tasks from the pool
            List<int> removeThreads = new List<int>();

            lock (_tasks)
            {
                foreach (KeyValuePair<int, Task> task in _tasks)
                {
                    if (task.Value.Status != TaskStatus.Running)
                    {
                        task.Value.Wait();
                        removeThreads.Add(task.Value.Id);
                    }
                }

                foreach (int taskID in removeThreads)
                    _tasks.Remove(taskID);
            }

            CancellationToken ct = _cts.Token;
            TaskFactory factory = new TaskFactory(ct, TaskCreationOptions.LongRunning, TaskContinuationOptions.LongRunning, null);

            // Create new tasks if we have room in the pool
            while (_tasks.Count < _runningMax)
            {
                Task task = factory.StartNew(() => FindWork(ct));

                lock (_tasks)
                    _tasks.Add(task.Id, task);
            }

            // Take a rest so we don't run the CPU to death
            Thread.Sleep(1000);
        }
    }

Currently I have changed my task creation code to look like the following so that I can abort the tasks. I know this is not a good solution, but I don't know what else to do.

while (_tasks.Count < _runningMax)
            {
                Task task = factory.StartNew(() =>
                    {
                        try
                        {
                            using (_cts.Token.Register(Thread.CurrentThread.Abort))
                            {
                                FindWork(ct);
                            }
                        }
                        catch (ThreadAbortException)
                        {
                            return;
                        }
                    }, _cts.Token);

                _tasks.Add(task.Id, task);
            }

Solution

  • Could the following work in your scenario?

    Instead of spawning multiple threads and having them waiting in the queue, I would have a single thread in an infinite polling loop and having that one spawn a new thread when a new piede of work comes in. You can add a semaphore to limit the number of threads you create. Check sample code below, I've used a BlockingCollection instead of the RabbitMQ .

      public class QueueManager
        {
            public BlockingCollection<Work> blockingCollection = new BlockingCollection<Work>();
            private const int _maxRunningTasks = 3;
    
            static SemaphoreSlim _sem = new SemaphoreSlim(_maxRunningTasks);
    
            public void Queue()
            {
                blockingCollection.Add(new Work());
            }
    
            public void Consume()
            {
                while (true)
                {
                    Work work = blockingCollection.Take();
    
                    _sem.Wait();
    
                    Task t = Task.Factory.StartNew(work.DoWork);
                }
            }
    
            public class Work
            {
                public void DoWork()
                {
                    Thread.Sleep(5000);
                    _sem.Release();
                    Console.WriteLine("Finished work");
                }
            }
        }
    

    and my testing class

    class Test
        {
            static void Main(string[] args)
            {
                Consumer c = new Consumer();
                Task t = Task.Factory.StartNew(c.Consume);
    
                c.Queue();
                c.Queue();
                c.Queue();
                c.Queue();
                c.Queue();
    
                Thread.Sleep(1000);
                Console.ReadLine();
            }
        }