I am new to PostgreSQL and I want to insert mqtt messages to PostgreSQL database using pysycopg2. Unfortunately, it is not working as expected. I think it is a simple error but can't figure out what the error exactly is. First I have published mqtt messages in mosquitto broker using a python script[1] and then subscribed from another script [2] and tried to store into postgresql. The corresponding error message is shown in [3].
Here is my Publisher script for publishing fake mqtt-json data to mosquitto broker:
#!/usr/bin/python
import paho.mqtt.client as mqtt
import numpy as np
import time
broker_address = "localhost"
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_address,1883,60)
client.loop_start()
while True:
time.sleep(0.05)
degrees = np.random.random_sample()
toa = np.random.random_sample()
humidity = np.random.random_sample()
json = ('''[{"time": "2020-04-01 21:00:00", "device_addr": "buizg8b8", "FCntUp": "7281", "CF":"867900000", "BW":"125000", "SF":"10", "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec": "245244546", "offset":"4184", "Uncertainty": "7816", "Offset Uncertainty":"201.17" ,"device EUI":"ruzfv276gz2v23g", "Id":"0" , "Latitude": "30.347834" , "Longitude":"20.34763", " Altitude":"500","MIC":"87hiub87"}]''')
locpk= '{"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}'
locpk= str(locpk)
json= str(json)
client.publish("device14/geo", locpk, 1, 1)
client.publish("device14/geo", json, 1, 1)
Here is my subscriber script to subscribe published messages and insert into PostgreSQL:
#!/usr/bin/python
import psycopg2
from psycopg2 import connect, Error
from config import config
import paho.mqtt.client as mqtt
import datetime
import time
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("device14/geo",0)
def on_message(client, userdata, msg):
Date = datetime.datetime.utcnow()
message= msg.payload.decode()
try:
#print the JSON Message with Data and Topic
print(str(Date) + ": " + msg.topic + " " + str(message))
#concatenate the SQL string
sql_string = "INSERT INTO table_name(column_name)\nVALUES %s" % (message)
#execute the INSERT statement
cur = conn.cursor()
cur.execute(sql_string)
#commit the changes to the database
conn.commit()
print("Finished writing to PostgreSQL")
except (Exception, Error) as err:
print("\npsycopg2 connect error:", err)
#print("Could not insert " + message + " into Postgresql DB")
#Set up a client for Postgresql DB
try:
#read connection parameters
params = config()
#connect to the PostgreSQL server
print('Connecting to the PostgreSQL database...')
conn = psycopg2.connect(**params)
#create a cursor
cur = conn.cursor()
#execute a statement
print('PostgreSQL database version:')
cur.execute('SELECT version()')
cur.execute(sql)
#display the PostgreSQL database server version
db_version = cur.fetchone()
print(db_version)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
#Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()
#set last will message
client.will_set('Postgresql_Paho-Client/lastwill','Last will message', 1, True)
client.on_connect = on_connect
client.on_message = on_message
connOK=False
while(connOK == False):
try:
client.connect("localhost", 1883, 60)
connOK = True
except:
connOK = False
time.sleep(2)
#Blocking loop to the Mosquitto broker
client.loop_forever()
Error:
/home/osboxes/postgresql/bin/python /home/osboxes/PycharmProjects/postgresql/geo_store.py
Connecting to the PostgreSQL database...
PostgreSQL database version:
no results to fetch
Connected with result code 0
Received a message on topic: device14/geo
2020-04-10 15:18:00.336002: device14/geo [{"time": "2020-04-01 21:00:00", "device_addr": "buizg8b8", "FCntUp": "7281", "CF":"867900000", "BW":"125000", "SF":"10", "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec": "245244546", "offset":"4184", "Uncertainty": "7816", "Offset Uncertainty":"201.17" ,"device EUI":"ruzfv276gz2v23g", "Id":"0" , "Latitude": "30.347834" , "Longitude":"20.34763", " Altitude":"500","MIC":"87hiub87"}]
psycopg2 connect error: syntax error at or near "["
LINE 2: VALUES [{"time": "2020-04-05 21:00:00", "device_addr": "buizg...
^**
Received a message on topic: device14/geo
2020-04-10 15:18:00.366786: device14/geo {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}
psycopg2 connect error: syntax error at or near "{"
LINE 2: VALUES {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000...
Looking forward to your comments. Any help would be greatly appreciated.
PS: I have also tried changing the publishing message structures (i.e. locpk, json) but didn't help. If you have any suggestions on how the published message structure should be please let me know. I will try it.
I don't see the table structure for table_name
but if it only has one column (column_name
) and you want to store a JSON document in it you need to define it as jsonb
in PostgreSQL.
Inserting the data in such a column is easy:
from psycopg2.extras import Json
...
query = "INSERT INTO table_name(column_name) VALUES (%s)"
data = (Json(message),)
cur.execute(query, data)
conn.commit()
...
However, using a single column for the whole message is not a good design choice.
Create columns for common keys such as time, device_addr, latitude, longitude, altitude
and others (I'm just guessing here based on the data provided).
Store less important (potentially missing) keys in a separate jsonb column (called data
for example).