Search code examples
pythonmqttpahouvicorn

Paho MQTT on uvicorn server


I have a very simple FastAPI python server:

import io
import os
import sys
import json
import time
from PIL import Image
import paho.mqtt.client as mqtt
from fastapi import FastAPI, File, HTTPException, UploadFile, Form

# Initialize FastAPI
app = FastAPI()

# Initialize ENV variables
args = {
    'broker'    : os.environ.get('BROKER', '127.0.0.1'),
    'port'      : int(os.environ.get('PORT', '1883')),
    'topic'     : os.environ.get('TOPIC', 'topic')
}
    
# Initialize MQTT
print('Connecting to MQTT broker {}:{}.'.format(args['broker'], args['port']), flush=True)
mqtt_client = mqtt.Client(args['model'])
mqtt_client.connect(args['broker'], args['port'])

@app.get("/")
async def info():
    return "Send a POST request to / with an image\nWill publish results to topic {}".format(args['topic'])
    
@app.post("/")
async def run(image: UploadFile = File(...)):
    try:
        start = time.time()
        
        # Read request data
        contents = await image.read()
        image = Image.open(io.BytesIO(contents))

        # Do something with the image
        results = ["todo"]
        print('Process took {} seconds'.format(time.time() - start), flush=True)
        
        # Publish to MQTT topic
        print('Publish to MQTT {}'.format(args['topic']), flush=True)
        (rc, mid) = mqtt_client.publish(args['topic'], json.dumps(results), qos=2)
        print("Code {} while sending message {}: {}".format(rc, mid, mqtt.error_string(rc)))
        #if not rc == mqtt.MQTT_ERR_SUCCESS: print("Code {} while sending message {}: {}".format(rc, mid, mqtt.error_string(rc)))

        # Format response
        data = {}
        data['res'] = results
        data['count'] = len(results)
        data['success'] = True
        return data
    except:
        e = sys.exc_info()[1]
        print('Python error with no Exception handler:')
        print('Traceback error: {}'.format(e))
        raise HTTPException(status_code=500, detail=str(e))

My aim is to publish the results both on HTTP response and on MQTT topic. The MQTT connection seems to be working.

When I send a request to the web server, the following logs appear:

INFO:     Will watch for changes in these directories: ['/app']
INFO:     Uvicorn running on http://0.0.0.0:80 (Press CTRL+C to quit)
INFO:     Started reloader process [1] using statreload    
Connecting to MQTT broker 192.168.1.201:1883.
INFO:     Started server process [7]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

Process took 0.025616168975830078 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 1: No error.
INFO:     10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
Running TensorFlow interpreter on image
Process took 0.023961544036865234 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 2: No error.
INFO:     10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
Running TensorFlow interpreter on image
Process took 0.031525611877441406 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 3: No error.
INFO:     10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK

The publish seems to go fine but I can't receive any message (whereas it works directly from command line).

The server is started using command:

uvicorn server:app --reload --port 80 --host 0.0.0.0

How to run publish from another thread ?


Solution

  • You haven't started the MQTT client network loop.

    You should probably add mqtt_client.loop_start() after the call to mqtt_client.connect()