Search code examples
c#rabbitmqpublish-subscribeeasynetq

Re-queue message on exception


I'm looking for a solid way of re-queuing messages that couldn't be handled properly - at this time.

I've been looking at http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/ and it seems that it's supported to requeue messages in the RabbitMQ API.

else //reject the message but push back to queue for later re-try
{
    Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message);
    model.BasicReject(deliveryArguments.DeliveryTag, true);
}

However I'm using EasyNetQ. So wondering how I would do something similar here.

bus.Subscribe<MyMessage>("my_subscription_id", msg => {
    try
    {
        // do work... could be long running
    }
    catch ()
    {
        // something went wrong - requeue message
    }
});

Is this even a good approach? Not ACK the message could cause problems if do work exceeds the wait for ACK timeout by the RabbitMQ server.


Solution

  • So I came up with this solution. Which replaces the default error strategy by EasyNetQ.

    public class DeadLetterStrategy : DefaultConsumerErrorStrategy
    {
        public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer)
        : base(connectionFactory, serializer, logger, conventions, typeNameSerializer)
        {
        }
    
        public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
        {
            object deathHeaderObject;
            if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject))
                return AckStrategies.NackWithoutRequeue;
    
            var deathHeaders = deathHeaderObject as IList;
    
            if (deathHeaders == null)
                return AckStrategies.NackWithoutRequeue;
    
            var retries = 0;
            foreach (IDictionary header in deathHeaders)
            {
                var count = int.Parse(header["count"].ToString());
                retries += count;
            }
    
            if (retries < 3)
                return AckStrategies.NackWithoutRequeue;
            return base.HandleConsumerError(context, exception);
        }
    }
    

    You replace it like this:

    RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())
    

    You have to use the AdvancedBus so you have to setup everything up manually.

    using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()))
    {
        var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct);
        var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct);
        var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name);
        bus.Advanced.Bind(deadExchange, queue, "");
        bus.Advanced.Bind(textExchange, queue, "");
    
        bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info));
    }
    

    This will dead letter a failed message 3 times. After that it'll go to the default error queue provided by EasyNetQ for error handling. You can subscribe to that queue.

    A message is dead lettered when an exception propagates out of your consumer method. So this would trigger a dead letter.

    static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info)
    {
        throw new Exception("This is a test!");
    }