Search code examples
c#msmqthread-abort

Stopping a service that reads MSMQ


I'm a Java programmer who has been asked to make some changes to C# applications. I've been working with C# for a week now, and I've finally hit a point where looking at the documentation isn't helping and I can't find solutions when I google.

In this case I have a Windows Service that processes messages that arrive in a MSMQ. When a message is received the currently listening thread picks it up and goes off to do an operation that takes a couple of seconds.

public void Start()
{
    this.listen = true;
    for (int i = 0; i < Constants.ThreadMaxCount; i++)
    {
        ThreadPool.QueueUserWorkItem(new WaitCallback(this.StartListening), i);
    }
    ...

private void StartListening(Object threadContext)
{

    int threadId = (int)threadContext;
    threads[threadId] = Thread.CurrentThread;
    PostRequest postReq;
    while(this.listen)
    {
        System.Threading.Monitor.Enter(locker);
        try
        {

            postReq = GettingAMessage();
        }
        finally
        {
            System.Threading.Monitor.Exit(locker);
        }
    }
    ...
}

GettingAMessage() has the following lines that listen for a message:

Task<Message> ts = Task.Factory.FromAsync<Message>
    (queue.BeginReceive(), queue.EndReceive);
ts.Wait();

The problem is, when the Stop() method is called and there are no messages going into the MSMQ the threads all sit there waiting for a message. I have tried using timeouts, but that method doesn't seem elegant to me(and having switched over to the Task Factory, I'm not sure how to implement them currently). My solution to this was to add a reference of each thread to an array, so that I could cancel them. The following is called by each worker thread after being created.

threads[threadId] = Thread.CurrentThread;

and then supposed to be aborted by

public void Stop()
{
    try
    {
        this.listen = false;
        foreach(Thread a in threads) {
            a.Abort();
        }
    }
    catch
    {...}
}

Any advice on why this isn't shutting the threads down? (Or even better, can anyone tell me where I should look for how to cancel the ts.Wait() properly?)


Solution

  • Use the ManualResetEvent class to achieve a proper & graceful stopping of your running threads.

    In addition, don't use the ThreadPool for long running threads, use your own created threads, otherwise, with lots of long-running tasks, you could end up with thread-pool starvation, possibly even leading to deadlock:

    public class MsmqListener
    {
        privatec ManualResetEvent _stopRequested = new ManualResetEvent(false);
        private List<Thread> _listenerThreads;
        private object _locker = new _locker();
    
        //-----------------------------------------------------------------------------------------------------
    
        public MsmqListener
        {
            CreateListenerThreads();
        }
    
        //-----------------------------------------------------------------------------------------------------
    
        public void Start()
        {
          StartListenerThreads();
        }
    
        //-----------------------------------------------------------------------------------------------------
    
        public void Stop()
        {
            try
            {
                _stopRequested.Set();
                foreach(Thread thread in _listenerThreads)
                {
                    thread.Join(); // Wait for all threads to complete gracefully
                }
            }
            catch( Exception ex)
            {...}
        }
    
        //-----------------------------------------------------------------------------------------------------
    
        private void StartListening()
        {
                while( !_stopRequested.WaitOne(0) ) // Blocks the current thread for 0 ms until the current WaitHandle receives a signal
                {
                    lock( _locker )
                    {
                        postReq = GettingAMessage();
                    }
                ...
        }
    
        //-----------------------------------------------------------------------------------------------------
    
        private void CreateListenerThreads()
        {
            _listenerThreads = new List<Thread>();
            for (int i = 0; i < Constants.ThreadMaxCount; i++)
            {
                listenerThread = new Thread(StartListening);
                listenerThreads.Add(listenerThread);
            }
        }
    
        //-----------------------------------------------------------------------------------------------------
    
        private void StartListenerThreads()
        {
            foreach(var thread in _listenerThreads)
            {
                thread.Start();
            }
        }
    }
    

    UPDATE: I changed the use of AutoResetEvent with ManualResetEvent in order to support the stopping of multiple waiting threads (Using ManualResetEvent, once you signaled, all waiting threads will be notified and be free to proceed theirs job - stop pooling for messages, in your case).

    Using volatile bool does not provide all the guaranties. It may still read stale data. Better to use underlying OS synchronisation mechanism as it provides much stronger guaranties. Source: stackoverflow.com/a/11953661/952310