Search code examples
pythonmqtt

How send and recived mqtt message in same script


Hi can someone help me? I´m trying to send every 2 minutes a mqtt message but I also need to receive messages at any time.

The message every 2 minutes is the state message to know that the server is running and i need to receive messages to execute the shutdown commands.

I try this but this only send one message...

import time
from paho.mqtt import client as mqtt_client
from paramiko import SSHClient, AutoAddPolicy

broker = '192.168.15.210'
port = 1883
topic = "LV-Automation/Server"
topicState = "LV-Automation/Server/State"
client_id = 'ServerPower'

msgState = f"ON"

lastms = 10

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set("user", "password")
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        message = msg.payload.decode()
        if message == "OFF":
            client.publish(topicState, "OFF")
            client = SSHClient()
            client.load_system_host_keys()
            client.set_missing_host_key_policy(AutoAddPolicy())
            client.connect('192.168.15.220', username= 'root', password= 'Caro1992')
            stdin, stdout, stderr = client.exec_command('powerdown')


    client.subscribe(topic)
    client.on_message = on_message

    global lastms
    nowms = round(time.time() * 1000)
    print("Last Value: " , type(lastms))
    print ("New Value: " , type(nowms))
    if (nowms - lastms > 120000):
        lastms = round(time.time() * 1000)
        publish(client)

def publish(client):
    #msg_count = 0
    msg0 = f"ENCENDIDO"
    result = client.publish(topic, msg0)
    result = client.publish(topicState, msgState)
    status = result[0]
    if status == 0:
        print(f"Send `{msg0}` to topic `{topic}`")
    else:
        print(f"Failed to send message to topic {topic}")

def firstms():
    global lastms
    lastms = round(time.time() * 1000)
    return lastms

def run():
    firstms()
    client = connect_mqtt()
    publish(client)
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()

Solution

  • The subscribe function will only run once; so your code that checks the time (e.g. if (nowms - lastms > 120000):) only runs once. If you moved this up into the callback the code might work:

    def subscribe(client: mqtt_client):
        def on_message(client, userdata, msg):
            print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
            message = msg.payload.decode()        
    
            global lastms
            nowms = round(time.time() * 1000)
            print("Last Value: " , type(lastms))
            print ("New Value: " , type(nowms))
            if (nowms - lastms > 120000):
                lastms = round(time.time() * 1000)
                publish(client)
    
            if message == "OFF":
                client.publish(topicState, "OFF")
                sshClient = SSHClient()
                sshClient.load_system_host_keys()
                sshClient.set_missing_host_key_policy(AutoAddPolicy())
                sshClient.connect('192.168.15.220', username= 'root', password= 'Caro1992')
                stdin, stdout, stderr = sshClient.exec_command('powerdown')
    
    
        client.subscribe(topic)
        client.on_message = on_message    
    

    However there are a number of issues with this:

    • The value of msgState is never changed
    • The publish only runs when a message is received (which may, or may not, be OK in your use-case).
    • Calling publish from a callback is not a good idea (see this answer)

    A better solution is to use the threaded interface to the network loop (see this section of the readme and this example). The following is untested pseudo code to provide you with a starting point:

    import asyncio
    import paho.mqtt.client as mqtt_client
    
    def on_connect(client, userdata, flags, rc):
        print("Connection returned result: "+connack_string(rc))
        client.subscribe(topic)
    
    def on_message(client, userdata, message):
        # Do something with the message
        print("Received message '" + str(message.payload) + "' on topic '"
            + message.topic + "' with QoS " + str(message.qos))
    
    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect # Do the subscribe etc in the callback
    client.on_message = on_message
    client.connect(broker, port)
    client.loop_start() # Start network loop in separate thread
    
    while True: # infinite loop 
        await asyncio.sleep(120) # Pause for two minutes
        print("Publishing")
        client.publish(topic, msg0)
        client.publish(topicState, msgState)
    

    Another way of accomplishing this is to call loop regularly to process network events (but in this case that seems a little more complex and comments in the code recommend against using this on its own).