Search code examples
pythondatabasetime-seriesinfluxdb

Check for existing data before writing to database


When writing time series data from a pandas dataframe to an InfluxDB bucket, to check whether a specific row of data already exists in the bucket (and thus prevent data from being written again).

Format of the time series data that exists in the pandas dataframe (sample):

epoch,open,high,low,close,volume
1332374520.0,2.341,2.341,2.341,2.341,1.0
1332374700.0,2.343,2.343,2.343,2.343,1.0
1332374940.0,2.344,2.344,2.344,2.344,1.0
1332375420.0,2.344,2.344,2.344,2.344,2.0
1332375660.0,2.344,2.344,2.344,2.344,2.0
1332376080.0,2.344,2.344,2.344,2.344,1.0

The current Python program, as seen below, isn't detecting that the same data has already been written to the database bucket. If the program is run over and over, the output from print statement should be visible notifying that duplicate data has been detected.

import os
import pandas as pd
from tqdm import tqdm
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi

# Example OHLCV data
data = {
    "epoch": [1330902000, 1330902060, 1330902120],
    "open": [2.55, 2.532, 2.537],
    "high": [2.55, 2.538, 2.549],
    "low": [2.521, 2.531, 2.537],
    "close": [2.534, 2.538, 2.548],
    "volume": [150, 69, 38]
}
concat_of_all_dfs = pd.DataFrame(data)

def data_point_exists(epoch, bucket, org):
    query = f'''
    from(bucket: "{bucket}")
      |> range(start: 0)
      |> filter(fn: (r) => r["_measurement"] == "ohlcv")
      |> filter(fn: (r) => r["epoch"] == {epoch})
    '''
    result = query_api.query(org=org, query=query)
    return len(result) > 0

if __name__ == "__main__":

    # Database credentials
    token = os.getenv('INFLUXDB_TOKEN')
    bucket = "bucket_test"
    org = "organisation_test"
    url = "http://localhost:8086"

    # Initialize InfluxDB Client
    client = InfluxDBClient(url=url, token=token, org=org)
    write_api = client.write_api(write_options=SYNCHRONOUS)
    query_api = client.query_api()

    # Write data points one by one
    for index, row in tqdm(concat_of_all_dfs.iterrows(), total=len(concat_of_all_dfs)):
        epoch = row['epoch']
        if not data_point_exists(epoch, bucket, org):
            point = Point("ohlcv") \
                    .field("epoch", row['epoch']) \
                    .field("open", row['open']) \
                    .field("high", row['high']) \
                    .field("low", row['low']) \
                    .field("close", row['close']) \
                    .field("volume", row['volume'])
            write_api.write(bucket=bucket, org=org, record=point)
        else:
            print(f"Data point for epoch {epoch} already exists. Skipping...")

    client.close()

Are there any mistakes in the above code that would prevent repeat data from being detected (possibly in the querying function seen via the flux script, or anywhere else)?


Solution

  • I solved the issue by modifying the query code to search for the epoch in the table for each iteration. It would be good to optimise this program for writing in batches to the database whilst retaining the functionality of checking for duplicates as it's quite slow, but for now this works.

    Please note some functions relating to the pandas dataframe have been omitted.

    import os
    import pandas as pd
    from tqdm import tqdm
    from influxdb_client import InfluxDBClient, Point, WriteOptions
    from influxdb_client.client.write_api import SYNCHRONOUS
    from influxdb_client.client.query_api import QueryApi
    import datetime
    
    def data_point_exists(epoch, bucket, org):
        # Define the Flux query to fetch all fields
        flux_query = f'''
        from(bucket: "{bucket}")
          |> range(start: 0)  // adjust the range as needed
          |> filter(fn: (r) => r["_measurement"] == "ohlcv")
          |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
          |> keep(columns: ["_time", "epoch", "open", "high", "low", "close", "volume"])
        '''
    
        # Execute the query
        result = query_api.query(org=org, query=flux_query)
    
        # Search to see if epoch exists
        for table in result:
            for record in table.records:
                if record["epoch"] == epoch:
                    return True
        return False
    
    if __name__ == "__main__":
    
        # Database credentials
        token = os.getenv('INFLUXDB_TOKEN')
        bucket = "bucket"
        org = "org"
        url = "http://localhost:8086"
    
        # Initialize InfluxDB Client
        client = InfluxDBClient(url=url, token=token, org=org)
        write_api = client.write_api(write_options=SYNCHRONOUS)
        query_api = client.query_api()
    
        # Write data points one by one
        for index, row in tqdm(concat_of_all_dfs.iterrows(), total=len(concat_of_all_dfs)):
            epoch = row['epoch']
            flag = data_point_exists(epoch, bucket, org)
            if flag:
                print("Repeat detected")
            else:
                point = Point("ohlcv") \
                        .field("epoch", row['epoch']) \
                        .field("open", row['open']) \
                        .field("high", row['high']) \
                        .field("low", row['low']) \
                        .field("close", row['close']) \
                        .field("volume", row['volume'])
                write_api.write(bucket=bucket, org=org, record=point)
    
        client.close()