Search code examples
pythonpostgresqlmqttpsycopg2paho

MQTT & Postgresql DB: Insert a mqtt message into a column in postgresql table using python


I am new to PostgreSQL and I want to insert mqtt messages to PostgreSQL database using pysycopg2. Unfortunately, it is not working as expected. I think it is a simple error but can't figure out what the error exactly is. First I have published mqtt messages in mosquitto broker using a python script[1] and then subscribed from another script [2] and tried to store into postgresql. The corresponding error message is shown in [3].

Here is my Publisher script for publishing fake mqtt-json data to mosquitto broker:

#!/usr/bin/python

import paho.mqtt.client as mqtt
import numpy as np
import time

broker_address = "localhost"

def on_connect(client, userdata, flags, rc):
  print("Connected with result code " + str(rc))

client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_address,1883,60)
client.loop_start()

while True:
  time.sleep(0.05)
  degrees = np.random.random_sample()
  toa = np.random.random_sample()
  humidity = np.random.random_sample()
  json =  ('''[{"time": "2020-04-01 21:00:00",  "device_addr": "buizg8b8",  "FCntUp": "7281",   "CF":"867900000",   "BW":"125000",  "SF":"10",  "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec":   "245244546", "offset":"4184",   "Uncertainty": "7816",  "Offset Uncertainty":"201.17"   ,"device EUI":"ruzfv276gz2v23g", "Id":"0" ,     "Latitude": "30.347834" , "Longitude":"20.34763",   " Altitude":"500","MIC":"87hiub87"}]''')

  locpk= '{"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}'

  locpk= str(locpk)
  json= str(json)
  client.publish("device14/geo", locpk, 1, 1)
  client.publish("device14/geo", json, 1, 1)

Here is my subscriber script to subscribe published messages and insert into PostgreSQL:

#!/usr/bin/python
import psycopg2
from psycopg2 import connect, Error
from config import config
import paho.mqtt.client as mqtt
import datetime
import time

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("device14/geo",0)

def on_message(client, userdata, msg):
    Date = datetime.datetime.utcnow()
    message= msg.payload.decode()
    try:
            #print the JSON Message with Data and Topic
            print(str(Date) + ": " + msg.topic + " " + str(message))
            #concatenate the SQL string
            sql_string = "INSERT INTO table_name(column_name)\nVALUES %s" % (message)
            #execute the INSERT statement
            cur = conn.cursor()
            cur.execute(sql_string)
            #commit the changes to the database
            conn.commit()
            print("Finished writing to PostgreSQL")
    except (Exception, Error) as err:
            print("\npsycopg2 connect error:", err)
            #print("Could not insert " + message + " into Postgresql DB")

#Set up a client for Postgresql DB
try:
    #read connection parameters
    params = config()
    #connect to the PostgreSQL server
    print('Connecting to the PostgreSQL database...')
    conn = psycopg2.connect(**params)
    #create a cursor
    cur = conn.cursor()
    #execute a statement
    print('PostgreSQL database version:')
    cur.execute('SELECT version()')
    cur.execute(sql)
    #display the PostgreSQL database server version
    db_version = cur.fetchone()

    print(db_version)

except (Exception, psycopg2.DatabaseError) as error:
    print(error)
#Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()

#set last will message
client.will_set('Postgresql_Paho-Client/lastwill','Last will message', 1, True)

client.on_connect = on_connect
client.on_message = on_message
connOK=False
while(connOK == False):
    try:
        client.connect("localhost", 1883, 60)
        connOK = True
    except:
        connOK = False
    time.sleep(2)
#Blocking loop to the Mosquitto broker
client.loop_forever()

Error:

/home/osboxes/postgresql/bin/python /home/osboxes/PycharmProjects/postgresql/geo_store.py


Connecting to the PostgreSQL database...

PostgreSQL database version:

no results to fetch

Connected with result code 0

Received a message on topic: device14/geo


2020-04-10 15:18:00.336002: device14/geo [{"time": "2020-04-01 21:00:00",   "device_addr": "buizg8b8",  "FCntUp": "7281",   "CF":"867900000",   "BW":"125000",  "SF":"10",  "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec":   "245244546", "offset":"4184",   "Uncertainty": "7816",  "Offset Uncertainty":"201.17"   ,"device EUI":"ruzfv276gz2v23g", "Id":"0" ,     "Latitude": "30.347834" , "Longitude":"20.34763",   " Altitude":"500","MIC":"87hiub87"}]


psycopg2 connect error: syntax error at or near "["
LINE 2: VALUES [{"time": "2020-04-05 21:00:00", "device_addr": "buizg...
               ^**


Received a message on topic: device14/geo

2020-04-10 15:18:00.366786: device14/geo {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}

psycopg2 connect error: syntax error at or near "{"
LINE 2: VALUES {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000...

Looking forward to your comments. Any help would be greatly appreciated.

PS: I have also tried changing the publishing message structures (i.e. locpk, json) but didn't help. If you have any suggestions on how the published message structure should be please let me know. I will try it.


Solution

  • I don't see the table structure for table_name but if it only has one column (column_name) and you want to store a JSON document in it you need to define it as jsonb in PostgreSQL.
    Inserting the data in such a column is easy:

    from psycopg2.extras import Json
    ...
    query = "INSERT INTO table_name(column_name) VALUES (%s)"
    data = (Json(message),)
    cur.execute(query, data)
    conn.commit()
    ...
    

    However, using a single column for the whole message is not a good design choice.
    Create columns for common keys such as time, device_addr, latitude, longitude, altitude and others (I'm just guessing here based on the data provided).
    Store less important (potentially missing) keys in a separate jsonb column (called data for example).