I have a very simple FastAPI
python server:
import io
import os
import sys
import json
import time
from PIL import Image
import paho.mqtt.client as mqtt
from fastapi import FastAPI, File, HTTPException, UploadFile, Form
# Initialize FastAPI
app = FastAPI()
# Initialize ENV variables
args = {
'broker' : os.environ.get('BROKER', '127.0.0.1'),
'port' : int(os.environ.get('PORT', '1883')),
'topic' : os.environ.get('TOPIC', 'topic')
}
# Initialize MQTT
print('Connecting to MQTT broker {}:{}.'.format(args['broker'], args['port']), flush=True)
mqtt_client = mqtt.Client(args['model'])
mqtt_client.connect(args['broker'], args['port'])
@app.get("/")
async def info():
return "Send a POST request to / with an image\nWill publish results to topic {}".format(args['topic'])
@app.post("/")
async def run(image: UploadFile = File(...)):
try:
start = time.time()
# Read request data
contents = await image.read()
image = Image.open(io.BytesIO(contents))
# Do something with the image
results = ["todo"]
print('Process took {} seconds'.format(time.time() - start), flush=True)
# Publish to MQTT topic
print('Publish to MQTT {}'.format(args['topic']), flush=True)
(rc, mid) = mqtt_client.publish(args['topic'], json.dumps(results), qos=2)
print("Code {} while sending message {}: {}".format(rc, mid, mqtt.error_string(rc)))
#if not rc == mqtt.MQTT_ERR_SUCCESS: print("Code {} while sending message {}: {}".format(rc, mid, mqtt.error_string(rc)))
# Format response
data = {}
data['res'] = results
data['count'] = len(results)
data['success'] = True
return data
except:
e = sys.exc_info()[1]
print('Python error with no Exception handler:')
print('Traceback error: {}'.format(e))
raise HTTPException(status_code=500, detail=str(e))
My aim is to publish the results both on HTTP response and on MQTT topic. The MQTT connection seems to be working.
When I send a request to the web server, the following logs appear:
INFO: Will watch for changes in these directories: ['/app']
INFO: Uvicorn running on http://0.0.0.0:80 (Press CTRL+C to quit)
INFO: Started reloader process [1] using statreload
Connecting to MQTT broker 192.168.1.201:1883.
INFO: Started server process [7]
INFO: Waiting for application startup.
INFO: Application startup complete.
Process took 0.025616168975830078 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 1: No error.
INFO: 10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
Running TensorFlow interpreter on image
Process took 0.023961544036865234 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 2: No error.
INFO: 10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
Running TensorFlow interpreter on image
Process took 0.031525611877441406 seconds
Publish to MQTT topic/mytopic
Code 0 while sending message 3: No error.
INFO: 10.42.6.1:39480 - "POST / HTTP/1.1" 200 OK
The publish seems to go fine but I can't receive any message (whereas it works directly from command line).
The server is started using command:
uvicorn server:app --reload --port 80 --host 0.0.0.0
How to run publish
from another thread ?
You haven't started the MQTT client network loop.
You should probably add mqtt_client.loop_start()
after the call to mqtt_client.connect()