Search code examples
phprabbitmqamqp

RabbitMQ: How to move failed message from one queue to another queue?


I have two queues:

enter image description here

Same is visible when I run rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate -u admin -p admin:

I get:

+-------+-------------------------+-------------------------+----------+------------------------------------+
| vhost |          name           |          node           | messages | message_stats.publish_details.rate |
+-------+-------------------------+-------------------------+----------+------------------------------------+
| /     | high_priority           | rabbit@server-rabbitmq  | 5        | 0.0                                |
| /     | high_priority_secondary | rabbit@server-rabbitmq  | 0        | 0.0                                |
+-------+-------------------------+-------------------------+----------+------------------------------------+

My exchanges (rabbitmqadmin -V / list exchanges -u admin -p admin) are listed below:

+-------------------------+---------+
|          name           |  type   |
+-------------------------+---------+
|                         | direct  |
| amq.direct              | direct  |
| amq.fanout              | fanout  |
| amq.headers             | headers |
| amq.match               | headers |
| amq.rabbitmq.trace      | topic   |
| amq.topic               | topic   |
| high_priority           | direct  |
| high_priority_secondary | direct  |
| low_priority            | direct  |
+-------------------------+---------+

Queues and the whole related logic is implemented in PHP / Symfony, however I would like to use native logic (if possible) by using either rabbitmqadmin or rabbitmqctl commands in terminal.

If a message on the high_priority fails, I would like RabbitMQ to automatically move it to the high_priority_secondary queue without any PHP involvement. Is this possible? I've started reading about Dead Letter Exchanges but I'm not sure how to approach this.

I already created a consumer for the secondary queue so as soon as the message is moved there, it will be processed.

Is is possible to achieve this in CLI only?

FYI: There are some suggested posts on SO that already cover the question but none of the solutions is purely CLI one.


Solution

  • OK, while I didn't have to modify any PHP code, I did have to alter yaml configuration on the framework level as I wanted my solution to be persisted and part of the code base.

    In your app/config/services/rabbitmq.yaml:

    Define producers:

    high_priority:
        connection: default
        class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
        exchange_options:
            name: 'high_priority'
            type: direct
    high_priority_secondary:
        connection: default
        class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
        exchange_options:
            name: 'high_priority_secondary'
            type: direct
    message_hospital:
        connection: default
        class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
        exchange_options:
            name: 'message_hospital'
            type: direct
    

    Define consumers:

    high_priority:
        connection: default
        exchange_options:
            name: 'high_priority'
            type: direct
        queue_options:
            name: 'high_priority'
            arguments:
                x-dead-letter-exchange: ['S', 'high_priority_secondary']
        qos_options:
            prefetch_size: 0
            prefetch_count: 1
            global: false
        callback: foo.task_bus.consumer
    high_priority_secondary:
        connection: default
        exchange_options:
            name: 'high_priority_secondary'
            type: direct
        queue_options:
            name: 'high_priority_secondary'
            arguments:
                x-dead-letter-exchange: ['S', 'message_hospital']
        qos_options:
            prefetch_size: 0
            prefetch_count: 1
            global: false
        callback: foo.task_bus.consumer
    message_hospital:
        connection: default
        exchange_options:
            name: 'message_hospital'
            type: direct
        queue_options:
            name: 'message_hospital'
        qos_options:
            prefetch_size: 0
            prefetch_count: 1
            global: false
        callback: foo.task_bus.consumer
    

    Now the queues look like:

    enter image description here

    Thanks to the DLX attributes the message goes to the hospital queue as soon as it fails in the previous ones.