Search code examples
pythonmqttpaho

PAHO MQTT Python client - acknowledgement missing, guaranteed delivery for subscriber


Fellow developers,

I am looking at the Paho MQTT client (for Python) and I think I understand that the QoS setting for the publisher, e.g. a sensor or any data source really, does make sense - you want to be able to make sure a message has been received (by the broker/server) after all.

I also think it makes sense from a subscribers perspective to request e.g. QoS "2" in order to make sure the MQTT server does indeed send each message to the subscriber, but I am struggling with this: there does not seem to be a way for the subscriber to signal successful (or not) processing of a received message, in other words, some way of explicit acknowledgement seems to be missing?

Use case - successfully process messages

I would like to subscribe to a topic and process each data point successfully, e.g. by storing to a database. Therefore I need to accommodate the situation when a subscriber fails "in-flight", ie after receiving a message from the broker but before successful processing (storing to DB).

Assumptions

Now, should a subscriber (fixed client_id) fail while processing data it would restart and then reconnect to the MQTT broker, the broker identifies this particular client by id and starts pushing messages again - starting with the next message after the subscriber disconnected - as far as the broker is concerned it did successfully deliver this last message (cannot know, that the subscriber crashed).

Potential solution

If above assumption is true, then I would be better off not using a fixed client_id but rather "clean_session" and a random client_id; this way, the broker starts delivering all the messages is has persisted for a particular topic. This would of course put the responsibility of keeping track of successfully processed messages on the subscriber.

Is this how it needs to be done? Or is there a way to explicitly acknowledge successful processing of a message by the subscriber, so the broker can retransmit should need be - I am particular interested for the Paho Python library.

Thanks in advance!

EDIT 1:

relevant code:

def _handle_on_message(self, message):
    matched = False
    with self._callback_mutex:
        try:
            topic = message.topic
        except UnicodeDecodeError:
            topic = None

        if topic is not None:
            for callback in self._on_message_filtered.iter_match(message.topic):
                with self._in_callback:
                    callback(self, self._userdata, message)
                matched = True

        if matched == False and self.on_message:
            with self._in_callback:
                self.on_message(self, self._userdata, message)

source: https://github.com/eclipse/paho.mqtt.python/blob/v1.3.1/src/paho/mqtt/client.py#L2631

EDIT 2:

@hardlib is indeed correct- when failing inside the callback function, the client code will not acknowledge to the broker

some code for illustration

on_message callback in subscriber

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload)+ " mid:" + str(msg.mid))
    num = ''.join(x for x in str(msg.payload) if x.isdigit())
    if int(num) % 3 == 0:
        print("Going to show myself out now (sys.exit(1))")
        time.sleep(1)
        sys.exit(1)

publisher

while True:
    count += 1
    logging.debug("At: " + str(count))
    msg = "message: {counter}".format(counter=count)
    mqttc.publish("paho/stacko", msg, qos=2, retain=False)

Logs

publisher

root@14f00c2576b2:/usr/src/app# python publisher.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
DEBUG:root:At: 1
DEBUG:root:Sending PUBLISH (d0, q2, r0, m1), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received CONNACK (0, 0)
DEBUG:root:Received PUBREC (Mid: 1)
DEBUG:root:Sending PUBREL (Mid: 1)
DEBUG:root:Received PUBCOMP (Mid: 1)
DEBUG:root:At: 2
DEBUG:root:Sending PUBLISH (d0, q2, r0, m2), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 2)
DEBUG:root:Sending PUBREL (Mid: 2)
DEBUG:root:Received PUBCOMP (Mid: 2)
DEBUG:root:At: 3
DEBUG:root:Sending PUBLISH (d0, q2, r0, m3), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 3)
DEBUG:root:Sending PUBREL (Mid: 3)
DEBUG:root:Received PUBCOMP (Mid: 3)
DEBUG:root:At: 4
DEBUG:root:Sending PUBLISH (d0, q2, r0, m4), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 4)
DEBUG:root:Sending PUBREL (Mid: 4)
DEBUG:root:Received PUBCOMP (Mid: 4)
DEBUG:root:At: 5
DEBUG:root:Sending PUBLISH (d0, q2, r0, m5), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 5)
DEBUG:root:Sending PUBREL (Mid: 5)
DEBUG:root:Received PUBCOMP (Mid: 5)
DEBUG:root:At: 6
DEBUG:root:Sending PUBLISH (d0, q2, r0, m6), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 6)
DEBUG:root:Sending PUBREL (Mid: 6)
DEBUG:root:Received PUBCOMP (Mid: 6)
DEBUG:root:At: 7
DEBUG:root:Sending PUBLISH (d0, q2, r0, m7), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 7)
DEBUG:root:Sending PUBREL (Mid: 7)
DEBUG:root:Received PUBCOMP (Mid: 7)
DEBUG:root:At: 8
DEBUG:root:Sending PUBLISH (d0, q2, r0, m8), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 8)
DEBUG:root:Sending PUBREL (Mid: 8)
DEBUG:root:Received PUBCOMP (Mid: 8)
DEBUG:root:At: 9
DEBUG:root:Sending PUBLISH (d0, q2, r0, m9), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 9)
DEBUG:root:Sending PUBREL (Mid: 9)
DEBUG:root:Received PUBCOMP (Mid: 9)
DEBUG:root:At: 10
DEBUG:root:Sending PUBLISH (d0, q2, r0, m10), 'b'paho/stacko'', ... (11 bytes)
DEBUG:root:Received PUBREC (Mid: 10)
DEBUG:root:Sending PUBREL (Mid: 10)
DEBUG:root:Received PUBCOMP (Mid: 10)
DEBUG:root:At: 11
DEBUG:root:Sending PUBLISH (d0, q2, r0, m11), 'b'paho/stacko'', ... (11 bytes)
DEBUG:root:Received PUBREC (Mid: 11)
DEBUG:root:Sending PUBREL (Mid: 11)
DEBUG:root:Received PUBCOMP (Mid: 11)

subscriber

root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (0, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBLISH (d0, q2, r0, m1), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 1)
DEBUG:root:Received PUBREL (Mid: 1)
DEBUG:root:Message!!!! b'message: 2'
paho/stacko b'message: 2' mid:1
Num: 2
DEBUG:root:Sending PUBCOMP (Mid: 1)
DEBUG:root:Received PUBLISH (d0, q2, r0, m2), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 2)
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Message!!!! b'message: 3'
paho/stacko b'message: 3' mid:2
Num: 3
Going to show myself out now (sys.exit(1))

root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (1, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBLISH (d0, q2, r0, m3), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 3)
DEBUG:root:Received PUBLISH (d0, q2, r0, m4), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 4)
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBREL (Mid: 3)
DEBUG:root:Message!!!! b'message: 4'
paho/stacko b'message: 4' mid:3
Num: 4
DEBUG:root:Sending PUBCOMP (Mid: 3)
DEBUG:root:Received PUBREL (Mid: 4)
DEBUG:root:Message!!!! b'message: 5'
paho/stacko b'message: 5' mid:4
Num: 5
DEBUG:root:Sending PUBCOMP (Mid: 4)
DEBUG:root:Received PUBLISH (d0, q2, r0, m5), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 5)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Message!!!! b'message: 6'
paho/stacko b'message: 6' mid:5
Num: 6
Going to show myself out now (sys.exit(1))

root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (1, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBLISH (d0, q2, r0, m6), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 6)
DEBUG:root:Received PUBREL (Mid: 6)
DEBUG:root:Message!!!! b'message: 7'
paho/stacko b'message: 7' mid:6
Num: 7
DEBUG:root:Sending PUBCOMP (Mid: 6)
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBLISH (d0, q2, r0, m7), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 7)
DEBUG:root:Received PUBREL (Mid: 7)
DEBUG:root:Message!!!! b'message: 8'
paho/stacko b'message: 8' mid:7
Num: 8
DEBUG:root:Sending PUBCOMP (Mid: 7)
DEBUG:root:Received PUBLISH (d0, q2, r0, m8), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 8)
DEBUG:root:Received PUBREL (Mid: 8)
DEBUG:root:Message!!!! b'message: 9'
paho/stacko b'message: 9' mid:8
Num: 9
Going to show myself out now (sys.exit(1))
root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (1, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Received PUBREL (Mid: 8)
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBLISH (d0, q2, r0, m9), 'paho/stacko', ...  (11 bytes)
DEBUG:root:Sending PUBREC (Mid: 9)
DEBUG:root:Received PUBREL (Mid: 9)
DEBUG:root:Message!!!! b'message: 10'
paho/stacko b'message: 10' mid:9
Num: 10
DEBUG:root:Sending PUBCOMP (Mid: 9)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Received PUBLISH (d0, q2, r0, m10), 'paho/stacko', ...  (11 bytes)
DEBUG:root:Sending PUBREC (Mid: 10)
DEBUG:root:Received PUBREL (Mid: 10)
DEBUG:root:Message!!!! b'message: 11'
paho/stacko b'message: 11' mid:10
Num: 11
DEBUG:root:Sending PUBCOMP (Mid: 10)
DEBUG:root:Received PUBREL (Mid: 2)

Conclusion (for now)

MQTT (at least Mosquitto) does work as intended in terms of persistence: if a client reconnects it can do "catching up" for those messages missed since last connected. However, even with qos=2 set for both, subscriber and publisher, that last message before a crash will not be reprocessed


Solution

  • If there is a chance that your client may fail while processing a message it is up to you to store the message somewhere (probably on disk or in a database) before you start to process it.

    You can then check this storage when the client restarts and attempt to process it again. If you do this before reconnecting (with the fixed client_id) then you don't have to worry about new messages being delivered while you try to deal with the failing message.

    EDIT:

    Also looking at the in more detail at the code, for QOS2 the last leg of the delivery confirmation appears to only be sent after on_message has completed, so if you crash handling the message in the on_message then the broker should redeliver that message.

    https://github.com/eclipse/paho.mqtt.python/blob/e9914a759f9f5b8081d59fd65edfd18d229a399e/src/paho/mqtt/client.py#L2506