I have mqtt communication and device "A" issues a message with topic ex1.ex2.ex3 and device "B" subscribing to the ex1.ex2.ex3 topic cannot consume the message for some reason. Is there any way to get this message issued by device "A" to go to a specific "queue" after a "specific time" without the message being consumed? I know that mqtt communication doesn't have the concept of queues, but I want to do message expiration and dead letter processing in the earlier version of mqtt V5.
I've tried setting ttl for all queues and exchanges through policy settings like this
sudo rabbitmqctl set_policy TTL ".*" '{"message-ttl":3000, "dead-letter-exchange":"DeadLetterTestExchange", "dead-letter-routing-key":"test.deadletter.queue"}' --apply-to queues
, but there are still difficulties. The mqtt communication is done with qos 0 and I would like to know the detailed method. Even if I check the official Rabbitmq document, I can't see the expiration processing of the message or the wording related to DeadLetter in mqtt.
The use of QoS-0 publishes and subscriptions is creating queues that cannot use dead-letter policies with default RabbitMQ settings. Once you resolve that issue, you run the risk of encountering a second issue regarding message loops if you're trying to consume DLQ'd messages on the same connection that's consuming non-DLQ'd messages. More details below:
The first is that you're subscribing to your topic with MQTT QoS 0. Under the hood, RabbitMQ is binding a temporary queue to the default mqtt.exchange
(amq.topic
usually) for your subscription. If you subscribe with QoS 0, the temporary queue created is of a special type that policies cannot apply to. From the docs:
The MQTT QoS 0 queue type can be thought of as a “pseudo” or “virtual” queue: It is very different from the other queue types (classic queues, quorum queues, and streams) in the sense that this new queue type is neither a separate Erlang process nor does it store messages on disk. Instead, this queue type is a subset of the Erlang process mailbox. MQTT messages are directly sent to the MQTT connection process of the subscribing client. In other words, MQTT messages are sent to any “online” MQTT subscribers.
A side effect of that special-ness is that policies do not apply to QoS-0 queues.
On the other hand, policies do apply to QoS-1 queues. When you issue an MQTT subscribe at QoS-1, RabbitMQ creates a "normal" queue object (whose type is configurable: quorum or classic) for the subscription and binds it to the default mqtt.exchange
.
QoS-1 queues have policies apply to them normally. Here is an example screenshot of a QoS-0 subscriber and a QoS-1 subscriber in the RabbitMQ UI, in the presence of a policy called deadletter
that applies to queues with the regex .*
:
Note that the QoS-0 queue has the special rabbit_mqtt_qos0_queue
type (which is also configurable) and is not receiving the deadletter
policy.
In short: you'll have to subscribe to an MQTT topic at QoS-1, and publish messages at QoS-1.
You may alternatively be able to force QoS-0 queues to receive policies normally by altering the rabbit_mqtt_qos0_queue
RabbitMQ feature flag.
That's not all that's needed to resolve your issue, however. If you create two QoS-1 subscriptions on the same MQTT connection, RabbitMQ will implement those as two bindings (one per MQTT topic) to the same temporary queue. If that queue has a dead-letter policy routing to itself (even indirectly via a dead-letter exchange that routes to the bound queue via a different routing key), RabbitMQ will not deliver the message to it. I suspect, but am not certain, that this is due to RabbitMQ's attempting to prevent an infinite message loop.
As a result, you have to choose one of two options:
Doing the latter also allows you to craft the dead-lettering policy to exclude the second consumer's queue (if the DLQ consumer is at QoS-1 such that policies even can apply to its queue), preventing messages that were already dead-lettered from expiring if the dead-letter consumer can't get to them in time.
dead-letter-exchange
value that matches your mqtt.exchange
RabbitMQ config (default is amq.topic
). The regex can be .*
if you are consuming DLQ'd messages from a QoS-0 subscription.
mqtt-subscription-dlqconsumerqos1
)The below Python illustrates how consumption of DLQ'd MQTT messages would work using two separate consumer processes.
RabbitMQ Policy to expire messages after 2 seconds:
rabbitmqctl set_policy TTL "mqtt-subscription-dlq_client.*" '{"message-ttl":2000, "dead-letter-exchange":"amq.topic", "dead-letter-routing-key":"dlq"}' --apply-to queues
Primary consumer of non-DLQ messages, which takes 3 seconds to process each message. This script is also the publisher, though publishes can come from anywhere so long as they're QoS-1, primary.py
:
from paho.mqtt.client import Client, MQTTMessage, MQTTv5
from paho.mqtt.enums import CallbackAPIVersion
from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.properties import Properties
import time
def on_connect(client: Client, _, __, result, *___):
client.subscribe('non_dlq', qos=1)
client.publish('non_dlq', 'message1', qos=1)
client.publish('non_dlq', 'message2', qos=1)
client.publish('non_dlq', 'message3', qos=1)
print(f"Connected: {result}")
def on_message(client: Client, _, msg: MQTTMessage):
print(f"Primary: Got message from {msg.topic}: {msg.payload}")
# Sleep before acknowledging the message to force other ready messages in the queue to expire and get dead-lettered.
time.sleep(3)
client = Client(CallbackAPIVersion.VERSION2, client_id="main_client", protocol=MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
# Connect the main consumer with a maximum of 1 local messages to force dead-lettering to occur correctly: RabbitMQ's
# expiration only applies to messages that have not been claimed by a consumer.
connect_properties = Properties(PacketTypes.CONNECT)
connect_properties.ReceiveMaximum = 1
client.connect("localhost", properties=connect_properties)
client.loop_forever()
DLQ Consumer, dlq.py
:
from paho.mqtt.client import Client, MQTTMessage, MQTTv5
from paho.mqtt.enums import CallbackAPIVersion
def on_connect(client: Client, _, __, result, *___):
client.subscribe('dlq', qos=1)
print(f"Connected: {result}")
def on_message(client: Client, _, msg: MQTTMessage):
print(f"DLQ: Got message from {msg.topic}: {msg.payload}")
client = Client(CallbackAPIVersion.VERSION2, client_id="dlq_client", protocol=MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost")
client.loop_forever()
Apply the RabbitMQ policy, start dlq.py
first, then run primary.py
. The output of primary.py
should be:
Connected: Success
Primary: Got message from non_dlq: b'message1'
Indicating that it received message 1 of 3 from the primary, non-DLQ topic.
The output of dlq.py
should be:
Connected: Success
DLQ: Got message from dlq: b'message2'
DLQ: Got message from dlq: b'message3'
Indicating that it got the second and third messages from the dead-letter queue.