Search code examples
mqttpahopython-paho

How to keep reestablishing connection to an mqtt broker with paho mqtt


With the following code I am able to force the mqtt client to keep reestablishing the connection to the message broker even if it is not up yet. I would like to get it does via the use of custom try/except as implemented below.

However to me it seems there should be a native way to get this working.

import time

import paho.mqtt.client as mqtt

def main():
    # The callback for when the client receives a CONNACK response from the server.
    def on_connect(client, userdata, flags, reason_code, properties):
        print(f"Connected with result code {reason_code}")
        # Subscribing in on_connect() means that if we lose the connection and
        # reconnect then subscriptions will be renewed.
        client.subscribe("sensors")

    # The callback for when a PUBLISH message is received from the server.
    def on_message(client, userdata, msg):
        print(msg.topic+" "+str(msg.payload))

    def on_connect_fail(client, userdata, flags, reason_code, properties):
        print(f"Connection failed with result code {reason_code}")

    mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv311)
    # mqttc.enable_logger()
    mqttc.on_connect = on_connect
    mqttc.on_message = on_message
    mqttc.on_connect_fail = on_connect_fail

    mqttc.username_pw_set("username", "password")
    mqttc.tls_set(ca_certs="ca.crt", certfile="client.crt", keyfile="client.key")

    # Blocking call that processes network traffic, dispatches callbacks and
    # handles reconnecting.
    # Other loop*() functions are available that give a threaded interface and a
    # manual interface.
    try:
        mqttc.connect("mqtt.client.server.com", port=8883, keepalive=60)
        mqttc.loop_forever()
    except (TimeoutError, ConnectionRefusedError) as e:
        print("Error: ", e)
        time.sleep(10)
        print("Attempting again")
        main()

if __name__ == "__main__":
    main()

Workflow:

  1. Broker machine is stopped

  2. Script is started

  3. Broker machine is started

  4. Broker is started

  5. Client is successfully connected

❯ poetry run paho
Warning: 'paho' is an entry point defined in pyproject.toml, but it's not installed as a script. You may get improper `sys.argv[0]`.

The support to run uninstalled scripts will be removed in a future release.

Run `poetry install` to resolve and get rid of this message.

0
Error:  timed out
Attempting again
0
Error:  timed out
Attempting again
0
Error:  timed out
Attempting again
0
Error:  timed out
Attempting again
0
Error:  timed out
Attempting again
0
Error:  timed out
Attempting again
0
Error:  timed out
Attempting again
0
Error:  timed out
Attempting again
0
Error:  timed out
Attempting again
0
Error:  timed out
Attempting again
0
Error:  timed out
Attempting again
0
Error:  timed out
Attempting again
0
Error:  [Errno 61] Connection refused
Attempting again
0
Error:  timed out
Attempting again
0
Connected with result code Success

I tried setting loop_forever to retry 1st connection to true, but it did not work.

mqttc.loop_forever(retry_first_connection=True)

Solution

  • You are calling mqttc.connect("mqtt.client.server.com", port=8883, keepalive=60), and, as per the docs connect:

    is a blocking call that establishes the underlying connection and transmits a CONNECT packet.

    The exception you are seeing is being thrown by connect, meaning that the retry_first_connection=True you are passing to loop_forever is irrelevant (because that line is not run).

    The simplest solution is to avoid the blocking call connect, and use connect_async instead i.e.:

    mqttc.connect_async("mqtt.client.server.com", port=1883, keepalive=60)
    try:
       mqttc.loop_forever(retry_first_connection=True)
    

    connect_async will not attempt to connect directly, instead it configures things so that the connection will be established when the network loop is running (meaning that the retry_first_connection will work as expected).

    Note: covered in this issue.