Search code examples
pythonpandasnumpyinfluxdbtelegraf

Re-synchronizing timestamps of InfluxDB values


I have several values being put into an InfluxDB under different topics. They are provided via MQTT/JSON and processed by Telegraf. Each JSON record results in a tuple of entries in different series that have slightly different timestamps. The ∆ is typically below 1 millisecond whereas the JSON events are several seconds apart, so detecting should be manageable.

For further analysis it is necessary to re-group the entries so that the tuples are united again. Simply rounding the timestamp would do in most cases, but of course not generally because a boundary may lie just in between such a tuple.

Any ideas? I am accessing data via Python, so either an appropriate Influx query or processing in Python will do. Of course I can scan through the data in a loop and decide how to put them together, but I wonder if there is an elegant solution already at hand, maybe using one of the common Python libraries such as NumPy or Pandas. I assume I am not the only person who is faced with this kind of problem.


Solution

  • Ok, I opted for processing my data in Python. This is what I came up with. It can handle arbitrary granularity intervals. It's fast enough for the data volume I am handling; apart from the sort, it's O(n).

    # input is res: dictionary of {timestamp: (topic, value)}
    DELTA = timedelta(seconds=0.5) # granularity; adjust to needs
    t = datetime(dt.MINYEAR, 1, 1, tzinfo=dt.timezone.utc)  # init running tstamp var
    t_group = t # timestamp for sample group
    outlist = [] # output list
    group = None # current sample group
    for key, (topic, val) in sorted(res.items()):
      if (key - t) > DELTA: # new group
        t_group = key
        if group: outlist.append(group) # save previous group to output list
        group = {"time": t_group} # init new group with 1st timestamp
      print(f"{t_group}\t{key}\t{topic} = {val}\t∆={key-t}")
      group[topic] = val # add to group
      t = key
    print(f"\n{len(outlist)} entries extracted.\n")