Search code examples
rabbitmqmqtt

How can I put DeadLetter processing in the mqtt communication of rabbitmq?


I have mqtt communication and device "A" issues a message with topic ex1.ex2.ex3 and device "B" subscribing to the ex1.ex2.ex3 topic cannot consume the message for some reason. Is there any way to get this message issued by device "A" to go to a specific "queue" after a "specific time" without the message being consumed? I know that mqtt communication doesn't have the concept of queues, but I want to do message expiration and dead letter processing in the earlier version of mqtt V5.

I've tried setting ttl for all queues and exchanges through policy settings like this

sudo rabbitmqctl set_policy TTL ".*" '{"message-ttl":3000, "dead-letter-exchange":"DeadLetterTestExchange", "dead-letter-routing-key":"test.deadletter.queue"}' --apply-to queues

, but there are still difficulties. The mqtt communication is done with qos 0 and I would like to know the detailed method. Even if I check the official Rabbitmq document, I can't see the expiration processing of the message or the wording related to DeadLetter in mqtt.


Solution

  • The use of QoS-0 publishes and subscriptions is creating queues that cannot use dead-letter policies with default RabbitMQ settings. Once you resolve that issue, you run the risk of encountering a second issue regarding message loops if you're trying to consume DLQ'd messages on the same connection that's consuming non-DLQ'd messages. More details below:

    First issue: QoS-0

    The first is that you're subscribing to your topic with MQTT QoS 0. Under the hood, RabbitMQ is binding a temporary queue to the default mqtt.exchange (amq.topic usually) for your subscription. If you subscribe with QoS 0, the temporary queue created is of a special type that policies cannot apply to. From the docs:

    The MQTT QoS 0 queue type can be thought of as a “pseudo” or “virtual” queue: It is very different from the other queue types (classic queues, quorum queues, and streams) in the sense that this new queue type is neither a separate Erlang process nor does it store messages on disk. Instead, this queue type is a subset of the Erlang process mailbox. MQTT messages are directly sent to the MQTT connection process of the subscribing client. In other words, MQTT messages are sent to any “online” MQTT subscribers.

    A side effect of that special-ness is that policies do not apply to QoS-0 queues.

    On the other hand, policies do apply to QoS-1 queues. When you issue an MQTT subscribe at QoS-1, RabbitMQ creates a "normal" queue object (whose type is configurable: quorum or classic) for the subscription and binds it to the default mqtt.exchange.

    QoS-1 queues have policies apply to them normally. Here is an example screenshot of a QoS-0 subscriber and a QoS-1 subscriber in the RabbitMQ UI, in the presence of a policy called deadletter that applies to queues with the regex .*:

    enter image description here

    Note that the QoS-0 queue has the special rabbit_mqtt_qos0_queue type (which is also configurable) and is not receiving the deadletter policy.

    In short: you'll have to subscribe to an MQTT topic at QoS-1, and publish messages at QoS-1.

    You may alternatively be able to force QoS-0 queues to receive policies normally by altering the rabbit_mqtt_qos0_queue RabbitMQ feature flag.

    Second issue: message loops when subscribing from the same consumer

    That's not all that's needed to resolve your issue, however. If you create two QoS-1 subscriptions on the same MQTT connection, RabbitMQ will implement those as two bindings (one per MQTT topic) to the same temporary queue. If that queue has a dead-letter policy routing to itself (even indirectly via a dead-letter exchange that routes to the bound queue via a different routing key), RabbitMQ will not deliver the message to it. I suspect, but am not certain, that this is due to RabbitMQ's attempting to prevent an infinite message loop.

    As a result, you have to choose one of two options:

    1. If consuming from the DLQ in the same MQTT connection that subscribes to the non-dead-lettered messages, subscribe to the "main" topic at QoS-1 and the DLQ topic at QoS-0.
    2. Run a separate MQTT consumer of the DLQ topic (at any QoS level). I recommend this approach.

    Doing the latter also allows you to craft the dead-lettering policy to exclude the second consumer's queue (if the DLQ consumer is at QoS-1 such that policies even can apply to its queue), preventing messages that were already dead-lettered from expiring if the dead-letter consumer can't get to them in time.

    How to avoid these issues

    1. Create a RabbitMQ policy like the one in your question, with a dead-letter-exchange value that matches your mqtt.exchange RabbitMQ config (default is amq.topic). The regex can be .* if you are consuming DLQ'd messages from a QoS-0 subscription.
      • If you are consuming DLQ'd messages on a QoS-1 subscription (e.g. you are persisting messages after consumers shut down), the regex should include MQTT topics that may dead-letter/expire messages, and should exclude the queues that the dead-letter consumer uses (queue names are autogenerated by RabbitMQ based on the MQTT client ID, yielding something like mqtt-subscription-dlqconsumerqos1)
    2. Subscribe to non-dead-lettered messages using a QoS-1 subscription.
    3. Subscribe to the dead-letter topic either at QoS-1 in a separate MQTT connection, or at QoS-0 on the same connection as is subscribed to the non-DLQ topic.
    4. Publish messages to the non-dead-lettered topic at QoS 1.

    Example: Using two processes to consume dead-lettered messages

    The below Python illustrates how consumption of DLQ'd MQTT messages would work using two separate consumer processes.

    RabbitMQ Policy to expire messages after 2 seconds:

    rabbitmqctl set_policy TTL "mqtt-subscription-dlq_client.*" '{"message-ttl":2000, "dead-letter-exchange":"amq.topic", "dead-letter-routing-key":"dlq"}' --apply-to queues
    

    Primary consumer of non-DLQ messages, which takes 3 seconds to process each message. This script is also the publisher, though publishes can come from anywhere so long as they're QoS-1, primary.py:

    from paho.mqtt.client import Client, MQTTMessage, MQTTv5
    from paho.mqtt.enums import CallbackAPIVersion
    from paho.mqtt.packettypes import PacketTypes
    from paho.mqtt.properties import Properties
    import time
    
    def on_connect(client: Client, _, __, result, *___):
        client.subscribe('non_dlq', qos=1)
        client.publish('non_dlq', 'message1', qos=1)
        client.publish('non_dlq', 'message2', qos=1)
        client.publish('non_dlq', 'message3', qos=1)
        print(f"Connected: {result}")
    
    def on_message(client: Client, _, msg: MQTTMessage):
        print(f"Primary: Got message from {msg.topic}: {msg.payload}")
        # Sleep before acknowledging the message to force other ready messages in the queue to expire and get dead-lettered.
        time.sleep(3)
    
    client = Client(CallbackAPIVersion.VERSION2, client_id="main_client", protocol=MQTTv5)
    client.on_connect = on_connect
    client.on_message = on_message
    
    # Connect the main consumer with a maximum of 1 local messages to force dead-lettering to occur correctly: RabbitMQ's
    # expiration only applies to messages that have not been claimed by a consumer.
    connect_properties = Properties(PacketTypes.CONNECT)
    connect_properties.ReceiveMaximum = 1
    client.connect("localhost", properties=connect_properties)
    client.loop_forever()
    

    DLQ Consumer, dlq.py:

    from paho.mqtt.client import Client, MQTTMessage, MQTTv5
    from paho.mqtt.enums import CallbackAPIVersion
    
    def on_connect(client: Client, _, __, result, *___):
        client.subscribe('dlq', qos=1)
        print(f"Connected: {result}")
    
    def on_message(client: Client, _, msg: MQTTMessage):
        print(f"DLQ: Got message from {msg.topic}: {msg.payload}")
    
    client = Client(CallbackAPIVersion.VERSION2, client_id="dlq_client", protocol=MQTTv5)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect("localhost")
    client.loop_forever()
    

    Apply the RabbitMQ policy, start dlq.py first, then run primary.py. The output of primary.py should be:

    Connected: Success
    Primary: Got message from non_dlq: b'message1'
    

    Indicating that it received message 1 of 3 from the primary, non-DLQ topic.

    The output of dlq.py should be:

    Connected: Success
    DLQ: Got message from dlq: b'message2'
    DLQ: Got message from dlq: b'message3'
    

    Indicating that it got the second and third messages from the dead-letter queue.