Search code examples
pythonrequestmqttblockingnonblocking

python request blocks publishing of mqtt messages


I am developing a program which get data from two sources:

  • Incoming mqtt messages (paho) from a frontend and
  • data from a RestAPI (request)

The incoming mqtt messages triggers the request to the RestAPI by a user input in the frontend.

Furthermore it sends in an interval of one sec data over mqtt to a frontend, using the same connection/client.

Edit: An alternative approach for this "push-approach" could be to ask for the data from the frontend side when the frontendsession is started/reloaded and afterwards push data from the backend if data was changed only. But this does not fix the blocking behavior of the request.

Following a code snipped...

def main():
    # Connect to MQTT broker
    mqtt = mqttClient()

    testClass = TestClassName(mqtt.client)

    try:
        # Start infinit loop to run mqtt client
        mqtt.client.loop_start() # in this loop the callback for the triggering messages is running
           
        # publish every 1 second
        while True:
            time.sleep(1)
            testClass.uiPublish() # send data back to frontend

    except KeyboardInterrupt:
        logging.debug("Programm stopped by user")
        mqtt.client.loop_stop()
        mqtt.client.disconnect()

The class TestClassName contains also the callbacks for the mqtt messages and the publishing function uiPublish(). In one of this callbacks the function for the request to the API is triggered.

The sent data is a attribute of the class TestClassName and this is updated by the API request. So both functions have to have access to this attribute.

# class TestClassName containes this function
def subscribeMqtt(self, topic):
    self.mqttClient.subscribe(topic)
    self.mqttClient.message_callback_add(topic, self.dotherequest)

In general it works but there is a problem: The request dotherequest to the API blocks sending the mqtt messages in the while loop in the main() function. The request takes some time (up to 5 seconds) and this leads also to a disconnect/reconnect to the mqtt broker. So, the frontend is outdated to the end of the request and also the reconnect on the broker is not a nice solution. The frontend should get the "old" information as long as the request is running. At the end of the request the attribute is updated and max 1 second later the data will be sent to the frontend.

I tried multiple approaches to handle this problem but I think I have some general problems to understand the principle behind multi-threading/async in python. I tried to set up the function which makes the request:

  • async with aiohttp,
  • extra thread with threading and
  • async with ThreadPool

All the approaches does not change the blocking of sending the mqtt messages. I think the reason could be because in the end I have to wait for the request function to be finished. I do not parallelize the same function multiple times. I try to parallelize to complete sections of the program.

Any ideas/recommendations how I can solve this issue?


Solution

  • So, I think I understand the issue and found a solution by adding a new thread as in the comments recommended.

    Following an example how I implemented the solution.

    Subscribe to mqtt topic and create callback to function self.dotherequest.

    # class TestClassName containes this function
    def subscribeMqtt(self, topic):
        self.mqttClient.subscribe(topic)
        self.mqttClient.message_callback_add(topic, self.dotherequest)
    

    In this function the new thread for the request is created and startet. The request itself is located in the function self.functionWithRequest The lock is used for reserve the attribute of the object in the moment the data is written to (in the case that multiple requests are running simultaniously), also known as race condition.

    def dotherequest(self, client, userdata, msg):
            payload = json.loads(msg.payload.decode())   
            # creating a lock 
            lock = threading.Lock() 
            # New thread form request (IO)
            t1 = threading.Thread(target=self.functionWithRequest, args=(payload, lock)) 
            t1.start()
    

    This is the function the request happens in.

    def functionWithRequest(self, payload, lock):   
        headers = { "Content-Type": "application/json" }
        data = {
            "datapoint1": "data1",
            "datapoint2": "data2",
            "datapoint3": "data3"
            }
        
        response = requests.post(os.getenv('requestUrl'), json=data, headers=headers, verify='ca.pem', timeout=15)
    
        if response.status_code == 200:
            load = json.loads(response.text)
                lock.acquire() 
                self.data = load['dataFromRequest']
                lock.release()
        else:
            log.error('Request NOT successful. Error code {0}.'.format(response.status_code))
    

    As I understood the difference between thread and process in python, threads are using the same GIL an so the GIL is switching all the time between the threads. In the end all is running on the same cpu core the single python intepreter is running. One advantage is that all threads can share the same variables/data.

    Processes are able to run on multiple cpu cores / multiple python interpreter which made it much harder to syncronize data after the processes finished. Advantage is more computational power.