Search code examples
rabbitmqmessagingrabbitmq-exchangedead-letter

Getting RabbitMq to behave as I want (dead letter and re-queue on errors)


I have a simple rabbit set up that is currently doing what I want...

It publishes messages based on the type of message. Each type gets its own queue.
When messages are published they sit on the queue even if there is no consumer to consume them (sit there forever if no consumer arrives).
When a consumer is there (there is only one!) it eats the messages.
If for some reason it cannot handle a message (eg it gets a sub message before the parent has arrived) it nacks the message back onto the queue.

If it sees the same message six times it nacks the message.

This all works, but currently after six attempts it drops the message.

What I would like is for the message to pass to a 'dead letter queue' and after some time (say 5 mins) re-queue that message at the end of the particular queue it came from.

I am definitely cargo cult programing and I don't quite understand all the exchange/queue/binding/routing keys and other arcania involved... hand holding is appreciated!

 public void PublishEntity<T>(T message) where T : class, ISendable
    {
        logger.Info($"publishing {message.UniqueId}");

        var factory = new ConnectionFactory
        {
            HostName = appSettings.RabbitHostName,
            UserName = appSettings.RabbitUsername,
            Password = appSettings.RabbitPassword
        };
        try
        {
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine($"Setting up queues for: {typeof(T).Name}");                      

                    channel.QueueDeclare($"App_{typeof(T).Name}",
                        true,
                        false,
                        false,
                        null);                        

                    var json = JsonConvert.SerializeObject(message);
                    var body = Encoding.UTF8.GetBytes(json);

                    channel.TxSelect();

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    properties.Headers = new Dictionary<string, object>
                    {
                        { "Id", Guid.NewGuid().ToString() }

                    };

                    channel.BasicPublish("",
                        $"App_{typeof(T).Name}",
                        properties,
                        body);
                    Data.MarkAsSent(message);
                    channel.TxCommit();
                }
            }
        }

ISendable just make sure the message has some properties used in Data.MarkAsSent(message); to mark in a database where we have got too.

The receiver has a similar lump of code to handle each type. As I say this is working.

What do I need to do to add the dead letter queue things?

My attempt like so created the dead letter queues, but nothing ever moves to them.

 public void PublishEntity<T>(T message) where T : class, ISendable
    {
        logger.Info($"publishing {message.UniqueId}");

        var factory = new ConnectionFactory
        {
            HostName = appSettings.RabbitHostName,
            UserName = appSettings.RabbitUsername,
            Password = appSettings.RabbitPassword
        };
        try
        {
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
                    channel.ExchangeDeclare("App.Dead.Letter", "direct", true);

                    var args = new Dictionary<string, object>
                    {
                        { "x-dead-letter-exchange", "App.Dead.Letter" },
                        {
                            "x-dead-letter-routing-key", $"DLQ.App_{typeof(T).Name}"
                        }
                    };

                    channel.QueueDeclare($"App_{typeof(T).Name}",
                        true,
                        false,
                        false,
                        args);

                    channel.QueueDeclare($"DLQ.App_{typeof(T).Name}",
                        true,
                        false,
                        false,
                        null);

                    var json = JsonConvert.SerializeObject(message);
                    var body = Encoding.UTF8.GetBytes(json);

                    channel.TxSelect();

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    properties.Headers = new Dictionary<string, object>
                    {
                        { "Id", Guid.NewGuid().ToString() }

                    };

                    channel.BasicPublish("",
                        $"App_{typeof(T).Name}",
                        properties,
                        body);
                    Data.MarkAsSent(message);
                    channel.TxCommit();
                }
            }
        }

In my reciever I have this magic

              catch (Exception ex)
            {
                var attemptsToHandle = MarkFailedToHandleMessage(logId, ex);
                if (attemptsToHandle > 5)
                {
                    //If we have seen this message many times then don't re-que.
                    channel.BasicNack(ea.DeliveryTag, false, false);
                    return;
                }

                // re-que so we can re-try later.
                channel.BasicNack(ea.DeliveryTag, false, true);
                return;
            }

Phew... lot of code. Thanks if you have made it this far....

I am asking what are the glaring issues in my code to make things fall to the dead letter queue.

and what extra do I need to add so that things in the dlq will bounce back to the main queue after some time. Additionally this sets up a dlq for each types queue... is this required or should there be a single queue to hold the errored messages?


Solution

  • So I think I got this working as I intend. Hard to test all this stuff though!

            public void PublishEntity<T>(T message) where T : class, ISendable
        {
            logger.Info($"publishing {message.UniqueId}");
    
            var factory = new ConnectionFactory
            {
                HostName = appSettings.RabbitHostName,
                UserName = appSettings.RabbitUsername,
                Password = appSettings.RabbitPassword
            };
            try
            {
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
                     // Declair dead letter queue for this type
                        channel.ExchangeDeclare("App.Dead.Letter", "direct", true);
                        var queueArgs = new Dictionary<string, object>
                        {
                            { "x-dead-letter-exchange", "App" },
                            {
                                "x-dead-letter-routing-key", $"App_{typeof(T).Name}"
                            }
                            ,{ "x-message-ttl", 30000 }
                        };
    
                        channel.QueueDeclare($"DLQ.App_{typeof(T).Name}",
                            true,
                            false,
                            false,
                            queueArgs);
                        channel.QueueBind($"DLQ.App_{typeof(T).Name}", "App.Dead.Letter", $"DLQ.App_{typeof(T).Name}", null);
    
                     // declair queue for this type
                        channel.ExchangeDeclare("App", "direct", true);
                        var args = new Dictionary<string, object>
                        {
                            { "x-dead-letter-exchange", "App.Dead.Letter" },
                            {
                                "x-dead-letter-routing-key", $"DLQ.App_{typeof(T).Name}"
                            }
                        };
    
                        channel.QueueDeclare($"App_{typeof(T).Name}",
                            true,
                            false,
                            false,
                            args);
                        channel.QueueBind($"App_{typeof(T).Name}", "App", $"App_{typeof(T).Name}", null);
    

    I added and exchange for my main queues to live in and actually bound the queues to the exchanges. I still don't know why I need to do this as it was working without this extra complexity.. I guess some magic was hiding it from me before?