Search code examples
python-3.xinfluxdbinfluxdb-python

Dump series back into InfluxDB after querying with replaced field value


Scenario

I want to send data to an MQTT Broker (Cloud) by querying measurements from InfluxDB.

I have a field in the schema which is called status. It can either be 1 or 0. status=0 indicated that series has not been sent to the cloud. If I get an acknowlegdment from the MQTT Broker then I wish to rewrite the query back into the database with status=1.

As mentioned in FAQs for InfluxDB regarding Duplicate data If the information has the same timestamp as the previous query but with a different field value => then the update field will be shown.

In order to test this I created the following:

CREATE DATABASE dummy
USE dummy
INSERT meas_1, type=t1, status=0,value=123 1536157064275338300

query:

SELECT * FROM meas_1

provides

time                status type value         
1536157064275338300 0      t1   234      

now if I want to overwrite the series I do the following:

INSERT meas_1, type=t1, status=1,value=123 1536157064275338300                                                                       

which will overwrite the series

 time                status type value         
 1536157064275338300 1      t1   234     

(Note: this is not possible via Tags currently in InfluxDB)

Usage

  1. Query some information using the client with "status"=0.
  2. Restructure JSON to be sent to the cloud
  3. Send the information to cloud
  4. If successful then write the output from Step 1. back into the DB but with status=1.

I am using the InfluxDBClient Python3 to create the Application (MQTT + InfluxDB)

Within the write_points API there is a parameter which mentions batch_size which require int as input.

I am not sure how can I use this with the Application that I want. Can someone guide me with this or with the Schema of the DB so that I can upload actual and non-redundant information to the cloud ?


Solution

  • The batch_size is actually the length of the list of the measurements that needs to passed to write_points.

    Steps

    1. Create client and query from measurement (here, we query gps information)

      client = InfluxDBClient(database='dummy')
      
      op = client.query('SELECT * FROM gps WHERE "status"=0', epoch='ns')
      
    2. Make the ResultSet into a list:

       batch = list(op.get_points('gps'))
      
    3. create an empty list for update

       updated_batch = []
      
    4. parse through each measurement and change the status flag to 1. Note, default values in InfluxDB are float

         for each in batch:
      new_mes = {
      'measurement': 'gps',
      'tags': {
      'type': 'gps'
      },
      'time': each['time'],
      'fields': {
        'lat': float(each['lat']),
        'lon': float(each['lon']),
        'alt': float(each['alt']),
        'status': float(1)
      }
      }
      updated_batch.append(new_mes)
      
    5. Finally dump the points back via the client with batch_size as the length of the updated_batch

      client.write_points(updated_batch, batch_size=len(updated_batch))
      

    This overwrites the series because it contains the same timestamps with status field set to 1