Search code examples
pythonmultithreadingflaskmqttiot

How to Log IoT Data in specific time interval using Flask/python


It is my first question here, please pardon any mistakes. I am trying to develop software using python/flask, which continuously receives IoT data from multiple devices, and log data at specific time intervals. for example, I have 3 devices and log data in 30 seconds intervals: device1,device2,device3 sends first data at 5:04:20, 5:04:29,5;04:31 respectively. Then these devices continuously send data every 1 or 2 seconds, I want to keep track of the last data and ensure that the next data is updated at 5:04:50,5:04:59, 5:05:01 respectively after that at 5:05:20 and so on.

I have written a script that ensures this for a single device using threading: here is the code:

import paho.mqtt.client as mqtt
import csv
import os
import datetime
import threading
import queue
import time
q = queue.Queue()
header = ["Date", "Time", "Real_Speed"]
Rspd_key_1 = "key1="
Rspd_key_2 = "key2="
state = 0
message = ""
values = {"Date": 0, "Time": 0, "Real_Speed": 0}
writeFlag = True
logTime = 0
locallog = 0
nowTime = 0
dataUpdated = False
F_checkTime = True
prev_spd = 9999



def checkTime():
    global logTime
    global locallog
    global values
    global dataUpdated
    timesDataMissed = 0
    while (F_checkTime):
        nowTime = time.time()
        if(logTime != 0 and nowTime-logTime >= 30):
            values['Date'] = datetime.date.today().strftime("%d/%m/%Y")
            now = datetime.datetime.now()
            values['Time'] = now.strftime("%H:%M:%S")
            if(dataUpdated):
                q.put(values)
                logTime +=30
                dataUpdated = False
                print(values)
                timesDataMissed=0
            else:
                values['Real_Speed'] = 'NULL'
                q.put(values)
                logTime = nowTime
                dataUpdated = False
                timesDataMissed += 1
                print(values)
                if(timesDataMissed > 10):
                    timesDataMissed = 0
                    logTime = 0


def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("something")


def write_csv():
    csvfile = open('InverterDataLogger01.csv', mode='w',
                   newline='',  encoding='utf-8')
    spamwriter = csv.DictWriter(csvfile, fieldnames=header)
    spamwriter.writeheader()
    csvfile.close()
    while writeFlag:
        # print("worker running ",csv_flag)
        time.sleep(0.01)
        # time.sleep(2)
        while not q.empty():
            results = q.get()
            if results is None:
                continue
            csvfile = open('InverterDataLogger01.csv', mode='a',
                           newline='',  encoding='utf-8')
            spamwriter = csv.DictWriter(csvfile, fieldnames=header)
            spamwriter.writerow(results)
            csvfile.close()
            print("Written in csv File")



def find_spd_val(message):
    Do Something
    return realspd


def on_message(client, userdata, msg):
    message = str(msg.payload.decode("utf-8", "ignore"))
    topic = str(msg.topic)
    global values
    global dataUpdated
    global r_index
    global prev_spd
    global rspd
    global locallog
    if(logTime==0):
        global logTime
        logTime = time.time()
        locallog=logTime

    else:
        try:
            rspd = int(find_spd_val(message))
        except:
            pass
        if(prev_spd == 9999):
            prev_spd = rspd
        else:
            values['Real_Speed'] = rspd



def on_publish(client, userdata, mid):
    print("Message Published")


client = mqtt.Client("hidden")
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.connect("hidden")
client.loop_start()
t1 = threading.Thread(target=write_csv)  # start logger
t2 = threading.Thread(target=checkTime)  # start logger

t1.start()  # start logging thread
t2.start()  # start logging thread

print('written')
try:
    while True:
        time.sleep(1)
        pass
except:
    print("interrrupted by keyboard")
    client.loop_stop()  # start loop
    writeFlag = False  # stop logging thread
    F_checkTime = False
    time.sleep(5)

I want to do the same thing using python/flask to handle multiple devices. I am new to flask, can you please give me any guidelines, how can I ensure this functionality in the flask, what technology should I use?


Solution

  • I'll propose a simple solution that avoids the need to run any timers at the cost of delaying the write to file for a second or so (whether this is an issue or not depends upon your requirements).

    Data from the devices can be stored in a structure that looks something like (a very rough example!):

    from datetime import datetime
    
    dev_info = { 
                'sensor1': {'last_value': .310, 'last_receive': datetime(2021, 8, 28, 12, 8, 1, 1234), 'last_write': datetime(2021, 8, 28, 12, 8, 0, 541)},
                'sensor2': {'last_value': 5.2, 'last_receive': datetime(2021, 8, 28, 12, 7, 59, 1234), 'last_write': datetime(2021, 8, 28, 12, 7, 58, 921)}
               }
    

    Every time a new sample is received (you can probably use a single subscription for this and determine which device the message is from by checking the message topic in on_message):

    1. Retrieve the last_write time for the device from the structure
    2. If its more than the desired interval old then write out the last_value to your CSV (using the timestamp last_write + interval) and update last_write (a bit of logic needed here; consider what happens if no info is received for a minute).
    3. Update the info for the device in the structure (last_value / last_receive).

    As I mentioned earlier the disadvantage of this is that the value is only written out after you receive a new value outside of the desired time window; however for many use-cases this is fine and will be considerably simpler than using timers. If you need more frequent writes then you could periodically scan for old data in the structure and write it out.

    There are a few other factors you may want to consider:

    • MQTT does not guarantee real-time delivery (particularly at QOS 1+).
    • The comms to IoT units can often be spotty so using QOS1+ (and clean_session=False) is worth considering.
    • Based on the above you may want to consider embedding timestamps in the messages (but this does lead to a need to keep the remote device clocks synchronised).
    • Storage is cheap - are you sure there is no benefit to storing all data received and then downsampling later?