I want to send data to an MQTT Broker (Cloud) by querying measurements from InfluxDB.
I have a field in the schema which is called status
. It can either be 1
or 0
. status=0
indicated that series has not been sent to the cloud. If I get an acknowlegdment from the MQTT Broker then I wish to rewrite the query back into the database with status=1
.
As mentioned in FAQs for InfluxDB regarding Duplicate data If the information has the same timestamp as the previous query but with a different field value => then the update field will be shown.
In order to test this I created the following:
CREATE DATABASE dummy
USE dummy
INSERT meas_1, type=t1, status=0,value=123 1536157064275338300
query:
SELECT * FROM meas_1
provides
time status type value
1536157064275338300 0 t1 234
now if I want to overwrite the series I do the following:
INSERT meas_1, type=t1, status=1,value=123 1536157064275338300
which will overwrite the series
time status type value
1536157064275338300 1 t1 234
(Note: this is not possible via Tags currently in InfluxDB)
"status"=0
.status=1
.I am using the InfluxDBClient Python3
to create the Application (MQTT + InfluxDB)
Within the write_points
API there is a parameter which mentions batch_size
which require int
as input.
I am not sure how can I use this with the Application that I want. Can someone guide me with this or with the Schema of the DB so that I can upload actual and non-redundant information to the cloud ?
The batch_size
is actually the length of the list of the measurements that needs to passed to write_points
.
Create client and query from measurement (here, we query gps information)
client = InfluxDBClient(database='dummy')
op = client.query('SELECT * FROM gps WHERE "status"=0', epoch='ns')
Make the ResultSet
into a list:
batch = list(op.get_points('gps'))
create an empty list for update
updated_batch = []
parse through each measurement and change the status
flag to 1
. Note, default values in InfluxDB are float
for each in batch:
new_mes = {
'measurement': 'gps',
'tags': {
'type': 'gps'
},
'time': each['time'],
'fields': {
'lat': float(each['lat']),
'lon': float(each['lon']),
'alt': float(each['alt']),
'status': float(1)
}
}
updated_batch.append(new_mes)
Finally dump the points back via the client with batch_size
as the length of the updated_batch
client.write_points(updated_batch, batch_size=len(updated_batch))
This overwrites the series because it contains the same timestamps with status
field set to 1