I have a docker compose network with three containers. They are:
The directus instance and the flask app are communicating successfully. But I can't seem to make the flask app and mosquitto message each other. They connect (and disconnect after a time out) but there is no sign of a message to or from the broker.
Theoretically, after they have connected, triggering the webhook should result in a message getting send to the broker and another getting send back from the broker. But while the docker compose logs show that the webhook was received, nothing happens.
Are flask and paho-mqtt not compatible?
I have also made the observation that depending on their position in the python skript, some messages will be printed and some not.
Here is the docker compose file:
services:
directus:
image: directus/directus:9.22.1
ports:
- "8055:8055"
volumes:
- ./directus/database:/directus/database
- ./directus/extensions:/directus/extensions
- ./directus/uploads:/directus/uploads
environment:
KEY: "0fda9121-269d-44bd-91f2-4ff05be14b4b"
SECRET: "86cb4a76-a4c6-4ce7-8181-b0445d95675c"
DB_CLIENT: "sqlite3"
DB_FILENAME: "/directus/database/data.db"
ADMIN_EMAIL: "admin@example.com"
ADMIN_PASSWORD: "d1r3ctu5"
CORS_ENABLED: true
app:
build: ./app
ports:
- "8080:8080"
depends_on:
- mosquitto
mosquitto:
image: eclipse-mosquitto
volumes:
- ./app/config/mosquitto.conf:/mosquitto/config/mosquitto.conf
here is the docker file for the app:
# syntax=docker/dockerfile:1
FROM python:3.10-alpine
WORKDIR /code
RUN apk add --no-cache gcc musl-dev linux-headers
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY . .
CMD ["python3", "app.py"]
and here app.py:
from flask import Flask
import paho.mqtt.client as mqtt
app = Flask(__name__)
client_a = mqtt.Client("Client", protocol=mqtt.MQTTv31)
client_a.connect(host="mosquitto")
print("Hi") # shows up in docker logs
def on_message(client, userdata, message):
print("received message: ", str(message.payload.decode("utf-8"))) # does not show up in docker logs
return "", 200
@app.route('/webhook', methods=['POST'])
def webhook():
message = "An new entry was created!"
print(message) # does not show up in the docker logs
client_a.publish("DIRECTUS", message)
return "Erfolg", 200
client_a.subscribe("DIRECTUS")
client_a.loop_start()
print("Hi") # does show up in docker logs (but only once)
client_a.on_message = on_message
client_a.loop_stop()
app.run(host='app', port=8080, debug=True, use_reloader=False)
In the docker logs I see all three containers starting:
[+] Running 3/0
✔ Container examples-directus-1 Created 0.0s
✔ Container examples-mosquitto-1 Created 0.0s
✔ Container examples-app-1 Recreated 0.0s
Attaching to examples-app-1, examples-directus-1, examples-mosquitto-1
examples-mosquitto-1 | 1684508456: mosquitto version 2.0.15 starting
examples-mosquitto-1 | 1684508456: Config loaded from /mosquitto/config/mosquitto.conf.
examples-mosquitto-1 | 1684508456: Opening ipv4 listen socket on port 1883.
examples-mosquitto-1 | 1684508456: Opening ipv6 listen socket on port 1883.
examples-mosquitto-1 | 1684508456: mosquitto version 2.0.15 running
examples-mosquitto-1 | 1684508456: New connection from 172.20.0.4:40961 on port 1883.
examples-mosquitto-1 | 1684508456: New client connected from 172.20.0.4:40961 as Client (p1, c1, k60).
examples-app-1 | Hi
examples-app-1 | Hi
examples-app-1 | * Serving Flask app 'app'
examples-app-1 | * Debug mode: on
examples-app-1 | WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
examples-app-1 | * Running on http://app:8080
examples-app-1 | Press CTRL+C to quit
examples-directus-1 | [15:00:57.942] INFO: Initializing bootstrap...
examples-directus-1 | [15:00:57.974] INFO: Database already initialized, skipping install
examples-directus-1 | [15:00:57.974] INFO: Running migrations...
examples-directus-1 | [15:00:57.977] INFO: Done
examples-directus-1 | [15:01:00.639] WARN: PUBLIC_URL should be a full URL
examples-directus-1 | [15:01:00.672] WARN: Spatialite isn't installed. Geometry type support will be limited.
examples-directus-1 | [15:01:00.730] INFO: Server started at http://0.0.0.0:8055
examples-directus-1 | [15:01:00.739] WARN: Update available: 9.22.1 -> 10.1.1
examples-directus-1 | [15:01:13] GET / 302 5ms
When I trigger the webhook, only one message appears in the logs:
examples-app-1 | 172.20.0.2 - - [19/May/2023 15:01:18] "POST /webhook HTTP/1.1" 200
moquitto seems completely unaffected. But after a while, the connection to app times out, so it knows app is there:
examples-mosquitto-1 | 1684507003: Client Client has exceeded timeout, disconnecting.
Edit
Thank you for your help. I now understood that I ended the thread of the mqtt client by calling loop_stop() (I thought it was more like the curly braces opening and closing a loop.)
I still don't receive any indication of successful communication between the flask app and the broker, sadly. So here I post my updated code, since I can't seem to find the mistake on my own.
import json
from flask import Flask
from flask_mqtt import Mqtt
from flask_socketio import SocketIO
app = Flask(__name__)
socketio = SocketIO(app)
app.config['MQTT_BROKER_URL'] = 'mosquitto'
app.config['MQTT_BROKER_PORT'] = 1883 # default port for non-tls connection
app.config['MQTT_KEEPALIVE'] = 5 # set the time interval for sending a ping to the broker to 5 seconds
app.config['MQTT_TLS_ENABLED'] = False
mqtt = Mqtt(app)
@mqtt.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
print(data) # not printed in docker logs
@socketio.on('publish')
def handle_publish(json_str):
data = json.loads(json_str)
mqtt.publish(data['topic'], data['message'])
@socketio.on('subscribe')
def handle_subscribe(json_str):
data = json.loads(json_str)
mqtt.subscribe(data['topic'])
@app.route('/webhook', methods=['POST'])
def webhook():
message = "A new entry was created!"
print(message) # does not show up in the docker logs
result = mqtt.publish('DIRECTUS', bytes(message, 'utf-8'))
print(result)
return "Erfolg", 200
@mqtt.on_connect()
def handle_connect(client, userdata, flags, rc):
mqtt.subscribe('DIRECTUS')
@mqtt.on_log()
def handle_logging(client, userdata, level, buf):
print('LOG: {}'.format(buf))
socketio.run(app, host='app', port=8080, use_reloader=False, debug=True, allow_unsafe_werkzeug=True)
EDIT 2 By using the mqtt logger I found out that messages are send successfully between my Mosquitto broker and my flask app. Strangely, the output from the print messages in the mqtt-handlers does not appear in the docker logs. Only logger messages of priority "warning" or higher appear.
EDIT 3 (In case it is useful for someone: here is a follow up question regarding the logging/printing printing in dockered flask mqtt app doesn' work as intended)
Paho MQTT is compatible with Flask but you have to use it correctly.
Your code reads:
client_a.subscribe("DIRECTUS")
client_a.loop_start()
print("Hi") # does show up in docker logs (but only once)
client_a.on_message = on_message
client_a.loop_stop()
You're calling client_a.loop_stop()
almost immediately after starting the client. Running the loop is necessary for Paho to manage MQTT keepalives and incoming and outgoing messages. If the loop's not running you can't expect Paho MQTT to work.
You might prefer to use Flask MQTT - it's a wrapper around Paho MQTT that simplifies its use with Flask.