It is my first question here, please pardon any mistakes. I am trying to develop software using python/flask, which continuously receives IoT data from multiple devices, and log data at specific time intervals. for example, I have 3 devices and log data in 30 seconds intervals: device1,device2,device3 sends first data at 5:04:20, 5:04:29,5;04:31 respectively. Then these devices continuously send data every 1 or 2 seconds, I want to keep track of the last data and ensure that the next data is updated at 5:04:50,5:04:59, 5:05:01 respectively after that at 5:05:20 and so on.
I have written a script that ensures this for a single device using threading: here is the code:
import paho.mqtt.client as mqtt
import csv
import os
import datetime
import threading
import queue
import time
q = queue.Queue()
header = ["Date", "Time", "Real_Speed"]
Rspd_key_1 = "key1="
Rspd_key_2 = "key2="
state = 0
message = ""
values = {"Date": 0, "Time": 0, "Real_Speed": 0}
writeFlag = True
logTime = 0
locallog = 0
nowTime = 0
dataUpdated = False
F_checkTime = True
prev_spd = 9999
def checkTime():
global logTime
global locallog
global values
global dataUpdated
timesDataMissed = 0
while (F_checkTime):
nowTime = time.time()
if(logTime != 0 and nowTime-logTime >= 30):
values['Date'] = datetime.date.today().strftime("%d/%m/%Y")
now = datetime.datetime.now()
values['Time'] = now.strftime("%H:%M:%S")
if(dataUpdated):
q.put(values)
logTime +=30
dataUpdated = False
print(values)
timesDataMissed=0
else:
values['Real_Speed'] = 'NULL'
q.put(values)
logTime = nowTime
dataUpdated = False
timesDataMissed += 1
print(values)
if(timesDataMissed > 10):
timesDataMissed = 0
logTime = 0
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("something")
def write_csv():
csvfile = open('InverterDataLogger01.csv', mode='w',
newline='', encoding='utf-8')
spamwriter = csv.DictWriter(csvfile, fieldnames=header)
spamwriter.writeheader()
csvfile.close()
while writeFlag:
# print("worker running ",csv_flag)
time.sleep(0.01)
# time.sleep(2)
while not q.empty():
results = q.get()
if results is None:
continue
csvfile = open('InverterDataLogger01.csv', mode='a',
newline='', encoding='utf-8')
spamwriter = csv.DictWriter(csvfile, fieldnames=header)
spamwriter.writerow(results)
csvfile.close()
print("Written in csv File")
def find_spd_val(message):
Do Something
return realspd
def on_message(client, userdata, msg):
message = str(msg.payload.decode("utf-8", "ignore"))
topic = str(msg.topic)
global values
global dataUpdated
global r_index
global prev_spd
global rspd
global locallog
if(logTime==0):
global logTime
logTime = time.time()
locallog=logTime
else:
try:
rspd = int(find_spd_val(message))
except:
pass
if(prev_spd == 9999):
prev_spd = rspd
else:
values['Real_Speed'] = rspd
def on_publish(client, userdata, mid):
print("Message Published")
client = mqtt.Client("hidden")
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.connect("hidden")
client.loop_start()
t1 = threading.Thread(target=write_csv) # start logger
t2 = threading.Thread(target=checkTime) # start logger
t1.start() # start logging thread
t2.start() # start logging thread
print('written')
try:
while True:
time.sleep(1)
pass
except:
print("interrrupted by keyboard")
client.loop_stop() # start loop
writeFlag = False # stop logging thread
F_checkTime = False
time.sleep(5)
I want to do the same thing using python/flask to handle multiple devices. I am new to flask, can you please give me any guidelines, how can I ensure this functionality in the flask, what technology should I use?
I'll propose a simple solution that avoids the need to run any timers at the cost of delaying the write to file for a second or so (whether this is an issue or not depends upon your requirements).
Data from the devices can be stored in a structure that looks something like (a very rough example!):
from datetime import datetime
dev_info = {
'sensor1': {'last_value': .310, 'last_receive': datetime(2021, 8, 28, 12, 8, 1, 1234), 'last_write': datetime(2021, 8, 28, 12, 8, 0, 541)},
'sensor2': {'last_value': 5.2, 'last_receive': datetime(2021, 8, 28, 12, 7, 59, 1234), 'last_write': datetime(2021, 8, 28, 12, 7, 58, 921)}
}
Every time a new sample is received (you can probably use a single subscription for this and determine which device the message is from by checking the message topic in on_message
):
last_write
time for the device from the structurelast_value
to your CSV (using the timestamp last_write
+ interval) and update last_write
(a bit of logic needed here; consider what happens if no info is received for a minute).last_value
/ last_receive
).As I mentioned earlier the disadvantage of this is that the value is only written out after you receive a new value outside of the desired time window; however for many use-cases this is fine and will be considerably simpler than using timers. If you need more frequent writes then you could periodically scan for old data in the structure and write it out.
There are a few other factors you may want to consider:
clean_session=False
) is worth considering.