Search code examples
pythonmqtt

Paho MQTT dynamically subscribe to new topic


I am still learning how to use MQTT to publish and subscribe.

Is it possible to dynamically subscribe to the topic if the user add in new topic?

For example, the code right now is something like this:

import anotherModule

topic = anotherModule.topic #the output is similar to this -> [('Test1', 0), ('Test2', 0)]
...

def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        new_msg = json.loads(msg.payload.decode())
        print(new_msg)
    
    def check_topic():
        while True:
            client.subscribe(anotherModule.topic) #subscribe to the new topic if there are any
            time.sleep(1)

    t3 = threading.Thread(target=check_topic)
    t3.start()
    #  client.subscribe(topic)
    client.on_message = on_message

client = connect_mqtt()
subscribe(client)
t1 = threading.Thread(target=periodic_check_connection) #this function is irrelvant to the question
t1.start()
t2 = threading.Thread(target=client.loop_forever)
t2.start()

Right now, I am using threading to run the subscribe on another thread in a while loop. This way, it will always subscribe to any new topic if user were to add.

Are there any better ways to do this? If user were to say subscribe to 'Test3', ('Test3', 0) will be appended in the anotherModule.topic. However, mqtt is unable to subscribe to this new topic dynamically and hence, the while loop is introduced.

Any help is appreciated. Thank you!

Edit 1:

def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    client.subscribe(anotherModule.topic) # I tried putting here but wont work
    return client

def update_topic(new_topic):
    client.subscribe(new_topic) # How to run client.subscribe here?

def run():
    global client
    client = connect_mqtt()
    subscribe(client)
    t1 = threading.Thread(target=periodic_check_connection)
    t1.start()
    t2 = threading.Thread(target=client.loop_forever)
    t2.start()

I have tried adding global on the client but it doesnt seems to work. It says name client is not defined for client.subscribe(new_topic)


Solution

  • I have managed to solve this by making anotherModule a class. I have instantiate the anotherModule and pass in the client from the current module so that I can just call client.subscribe from anotherModule rather than calling the function. Thank you for your help!

    # current module
    def run():
        client = connect_mqtt()
        anotherModule.ModuleClass(client) #client is pass to another module
        subscribe(client)
        t1 = threading.Thread(target=periodic_check_connection)
        t1.start()
        t2 = threading.Thread(target=client.loop_forever)
        t2.start()
    
    # anotherModule
    class ModuleClass:
        def __init__(self, client):
            self.client = client
            ....
    
        def updateTopic(new_topic):
            self.client.subscribe(new_topic)