Search code examples
rabbitmqttldead-letter

RabbitMQ - routing messages after they reach their expiration time


I've recently found out RabbitMQ feature that allows you to delay messages and it works great although I couldn't find any examples similar to what I need:

Let's say there are 3 types of messages: A, B and C. We've got 2 delay_queues with 1 hour and 2 hours 'x-message-ttl values. There are also 3 types of destination_queues - each for specific message type.

What I would like to achieve is after the message in one of the delay_queues reaches its TTL it's going to be routed to one of the destination_queues depending on its type. Something like this:

enter image description here

Is this even possible using RabbitMQ message properties? Any ideas? My code sending messages to the delay queue (after expiration they're sent to hello queue):

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))

channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)

channel.queue_bind(exchange='amq.direct',
                   queue='hello')

delay_channel = connection.channel()
delay_channel.confirm_delivery()

delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 3600000,
  'x-dead-letter-exchange' : 'amq.direct',
  'x-dead-letter-routing-key' : 'hello'
})

while 1 :
        delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))
        print "Sent to delay queue"

Solution

  • Okay so I've managed to find a solution. Not sure if it's the best one but it works.

    1. I created two exchanges: DELAY_EXCHANGE and ROUTER_EXCHANGE
    2. I binded DELAY_EXCHANGE to the delay_queue (with all of the routing_keys I use)
    3. Delay queue is set with x-dead-letter-exchange: ROUTER_EXCHANGE and x-message-ttl: 14000
    4. I binded ROUTER_EXCHANGE to all of the queues (A,B,C) with the corresponding routing_keys.

    This way while sending (pushing) the message I don't specify the queue, just exchange and th routing_key:

        delay_channel.basic_publish(exchange='delay_exchange',
                      routing_key='helloC',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))
    

    Message is pushed to the DELAY_EXCHANGE which directs it to the delay_queue, where it waits for its TTL. When the message expires it's redirected to the ROUTER_EXCHANGE which analyzes its routing_key and redirects it to one of the destination queues. Awesome.