Search code examples
c#multithreadingexceptionrabbitmqamqp

Application crashes & exits debugger while consumer is running & rabbit mq service restarts


I have a C# application in which I am trying to test the resiliency of my rabbit mq client. While the consumer is running, I stop the rabbit mq service to see how my consumer will handle this.

I have try and catch on almost all of my consumer, yet because of a possible exception in background thread, my application prints below in output window and exists debugger.

The thread 'AMQP Connection amqp://test.com:5671' (0x6da18) has exited with code 0 (0x0).

A first chance exception of type 'System.Net.WebException' occurred in System.dll

And then exists the debugger. The only thing I notice is the destructor of my consumer class is called before code exists.

using System;
using System.IO;
using System.Runtime.Serialization;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Util;

namespace RabbitMQClient
{
  public class MessageQueueConsumer : IHealthVerifiable
  {
    public class TimeoutException : Exception { }

    // Have to do this because, somehow, SharedQueue implementation of IEnumerable is faulty
    // Count() method hangs, and never returns
    private class BufferQueue : SharedQueue<BasicDeliverEventArgs>
    {
      public int Count()
      {
        return this.m_queue.Count;
      }
    }

    private const int DEFAULT_ACK_COUNT = 1000;

    private String connString;
    private QueueingBasicConsumer consumer;
    private IConnection conn;
    private IModel channel;
    private String queueName;
    private BufferQueue buffer;
    private Object locker = new Object();
    private ushort prefetchCount;
    private ushort ackCount;

    public MessageQueueConsumer(String queueName, String connString, ushort? ackCount = null)
    {
      this.queueName = queueName;
      this.connString = connString;
      if (ackCount != null)
        this.ackCount = ackCount.Value;
      else
        this.ackCount = DEFAULT_ACK_COUNT;
      this.prefetchCount = (ushort)(this.ackCount * 2);

      InitConsumer();
    }

    ~MessageQueueConsumer()
    {
      Close();
    }

    public void Close()
    {
      try
      {
        channel.Close(200, queueName + " Goodbye");
        conn.Close();
      }
      catch { } //if already closed, do nothing
    }

    private void InitConsumer()
    {
      try
      {
        ConnectionFactory factory = new ConnectionFactory();
        factory.Uri = connString;
        conn = factory.CreateConnection();
        channel = conn.CreateModel();
        channel.BasicQos(0, prefetchCount, false);
        buffer = new BufferQueue();

        consumer = new QueueingBasicConsumer(channel, buffer);
        channel.BasicConsume(queueName, false, consumer);
      }
      catch (Exception e)
      {
        InitConsumer();
      }
    }

    /// <summary>
    /// Get the next event from the queue
    /// </summary>
    /// <returns>Event</returns>
    public byte[] Dequeue(int? timeout = null)
    {
      lock (locker)
      {
        try
        {
          return AttemptDequeue(timeout);
        }
        catch (EndOfStreamException)
        {
          // Network interruption while reading the input stream
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (OperationInterruptedException)
        {
          // The consumer was removed, either through channel or connection closure, or through the
          // action of IModel.BasicCancel().
          // Attempt to reopen and try again
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (ConnectFailureException)
        {
          //Problems connecting to the queue, wait 10sec, then try again. 
          Thread.Sleep(10000);
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (Exception e)
        {
          //Problems connecting to the queue, wait 10sec, then try again. 
          Thread.Sleep(10000);
          InitConsumer();
          return AttemptDequeue(timeout);
        }
      }
    }

    private byte[] AttemptDequeue(int? tomeout)
    {
      BasicDeliverEventArgs message;

      if (tomeout == null)
        message = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
      else
      {
        if (!consumer.Queue.Dequeue(tomeout.Value, out message))
          throw new TimeoutException();
      }

      if (buffer.Count() == 0 || buffer.Count() == ackCount)
        channel.BasicAck(message.DeliveryTag, true);

      try
      {
        return message.Body;
      }
      catch (Exception e)
      {
        throw new SerializationException("Error deserializing queued message:", e);
      }
    }

    /// <summary>
    /// Attempt to connect to queue to see if it is available
    /// </summary>
    /// <returns>true if queue is available</returns>
    public bool IsHealthy()
    {
      try
      {
        if (channel.IsOpen)
          return true;
        else
        {
          InitConsumer();
          return true;
        }
      }
      catch
      {
        return false;
      }
    }
  }
}

Any ideas how I can catch this exception and attempt to retry connection?


Solution

  • The problems was using QueueingBasicConsumer which does not implement any recovery method. I changed to EventingBasicConsumer and recovery from a failure worked.

    namespace RabbitMQClient {   public class MessageQueueConsumer : IHealthVerifiable   {
        public class TimeoutException : Exception { }
    
    private class BufferQueue : SharedQueue<BasicDeliverEventArgs>
    {
      public int Count()
      {
        return this.m_queue.Count;
      }
    }
    
    private const int DEFAULT_ACK_COUNT = 1000;
    
    private String connString;
    private EventingBasicConsumer consumer;
    private IConnection conn;
    private IModel channel;
    private String queueName;
    private BufferQueue buffer;
    private Object locker = new Object();
    private ushort prefetchCount;
    private ushort ackCount;
    
    public MessageQueueConsumer(String queueName, String connString, ushort? ackCount = null)
    {
      this.queueName = queueName;
      this.connString = connString;
      if (ackCount != null)
        this.ackCount = ackCount.Value;
      else
        this.ackCount = DEFAULT_ACK_COUNT;
      this.prefetchCount = (ushort)(this.ackCount * 2);
    
      InitConsumer();
    }
    
    ~MessageQueueConsumer()
    {
      Close();
    }
    
    public void Close()
    {
      try
      {
        channel.Close(200, queueName + " Goodbye");
       // conn.Close();
      }
      catch { } //if already closed, do nothing
    }
    
    private void InitConsumer()
    {
      ConnectionFactory factory = new ConnectionFactory();
      factory.Uri = connString;
      conn = factory.CreateConnection();
      channel = conn.CreateModel();
      channel.BasicQos(0, prefetchCount, false);
      buffer = new BufferQueue();
    
      consumer = new EventingBasicConsumer(channel);
      channel.BasicConsume(queueName, false, consumer);
    
      // when message is recieved do following
      consumer.Received += (model, message) =>
      {
    
          if (buffer.Count() > DEFAULT_ACK_COUNT)
            Thread.Sleep(3000);
    
            buffer.Enqueue(message);
    
            if (buffer.Count() == 0 || buffer.Count() == ackCount)
              channel.BasicAck(message.DeliveryTag, true);
    
    
      };
    }
    
    /// <summary>
    /// Get the next event from the queue
    /// </summary>
    /// <returns>Event</returns>
    public byte[] Dequeue(int? timeout = null)
    {
      lock (locker)
      {
        try
        {
          return AttemptDequeue(timeout);
        }
        catch (EndOfStreamException)
        {
          // Network interruption while reading the input stream
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (OperationInterruptedException)
        {
          // The consumer was removed, either through channel or connection closure, or through the
          // action of IModel.BasicCancel().
          // Attempt to reopen and try again
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (ConnectFailureException)
        {
          //Problems connecting to the queue, wait 10sec, then try again. 
          Thread.Sleep(10000);
          InitConsumer();
          return AttemptDequeue(timeout);
        }
      }
    }
    
    private byte[] AttemptDequeue(int? tomeout)
    {
      BasicDeliverEventArgs message;
    
      while (true)
      {
        //while buffer has no events
        if (buffer.Count() == 0)
        {
          Thread.Sleep(3000);
        }
        else
        {
          message = buffer.Dequeue();
          break;
        }
    
      }
    
      try
      {
        return message.Body;
      }
      catch (Exception e)
      {
        throw new SerializationException("Error deserializing queued message:", e);
      }
    }
    
    /// <summary>
    /// Attempt to connect to queue to see if it is available
    /// </summary>
    /// <returns>true if queue is available</returns>
    public bool IsHealthy()
    {
      try
      {
        if (channel.IsOpen)
          return true;
        else
        {
          InitConsumer();
          return true;
        }
      }
      catch
      {
        return false;
      }
    }   } }