I'm building an API with using FastAPI and Paho-MQTT. The purpose of this API is to query data from the database and send to an Android app while is receiving data from some devices, in background, through MQTT, and saving it to MongoDB. The problem is the following: I can't recieve data from MQTT in background because it's stopping the API startup. I'd like to use the API routes normally while the background task continuously receives and saves the MQTT data.
The implementation of both aproaches are in app.py, in startup function and in name=='__main__'
respectively.
I have a app.py file, that is the main file and I also have a mqtt.py to handle MQTT incoming data.
@app.on_event("startup")
async def startup_event():
asyncio.create_task(mqtt_subscribe()) # Stops the startup flow when I try to run using create_task
# Some routes here...
@app.route("/")
async def root():
return {"hello": "world"}
if __name__ == "__main__":
mqtt_subscribe() # When I try running before uvicorn.run, doesn't start the subscription
uvicorn.run(app, host="0.0.0.0", port=8000)
client = motor.motor_asyncio.AsyncIOMotorClient(URL)
db = client.collection
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(CLIENT_ID)
# Set CA certificate
client.tls_set(ca_certs=ROOT_CA_PATH)
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.connect(BROKER, PORT)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
# Perform necessary operations with the received data
payload = msg.payload.decode('utf-8')
data = json.loads(payload)
result = db["devices"].insert_one(data)
print("Document inserted! ID:", result.inserted_id)
client.subscribe(TOPIC, qos=0)
client.on_message = on_message
async def mqtt_subscribe():
# Set up the MQTT client
print("function subscribe")
client = connect_mqtt()
subscribe(client)
client.loop_forever()
create_task(mqtt_subscribe)
in startup functionINFO: Started server process [1126]
INFO: Waiting for application startup.
function subscribe
Connected to MQTT Broker!
As you can see, INFO: Startup completed
doesn't appear, so I can't use the API routes.
Thanks in advance for your help!
You can run the MQTT loop in a background thread. Rather than relying on the startup
event I would probably do something like this:
import asyncio
import json
from threading import Thread
from fastapi import FastAPI
import paho.mqtt.client as mqtt
latest_mqtt_value = None
def on_message(client, userdata, message):
global latest_mqtt_value
payload = json.loads(message.payload)
latest_mqtt_value = payload['value']
def create_app():
app = FastAPI()
client = mqtt.Client()
client.connect('localhost', 1883)
client.subscribe('topic')
client.on_message = on_message
client.loop_start()
@app.get("/")
async def root():
return {"value": latest_mqtt_value}
return app
app = create_app()
This is a runnable example; if you put it in a file example.py
and run it like this:
uvicorn example:app
Then post a message to topic
with the value {"value": 123}
:
mosquitto_pub -t topic -m '{"value": 123}'
A subsequent fetch of http://localhost:8000
will return:
{"value": 123}
Etc.