Search code examples
influxdbflux-influxdb

Grouping by increasing stateDuration resets using Flux in InfluxDb


I am recording period between application heartbeats into Influxdb. The "target" period is 2000ms. If the period is above 2750ms, then it is defined as a "lag event".

My end objective is to run statistics on "how long" we are running without lag events.

I switched to Flux from Influxql, so that i could use the stateDuration() method. Using the below method, i am able to collect the increasing durations. At lag events, the state_duration is then reset to -1.

from (bucket: "sampledb/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
  r._measurement == "timers" and
  r._field == "HeartbeatMs" and
  r.character == "Tarek"
  )
|> stateDuration(fn: (r) => 
    r._value<=2750,
    column: "state_duration",
    unit: 1s
   )
|> keep(columns: ["_time","state_duration"])

At this point, I would like to be able to collect 'max(state_duration)' for each duration between lag events, and this is where i get stuck. Trying to "group by every new stateDuration sequence"/"group by increasing stateDurations"...

I was thinking that it might be possible to use "reduce()" or "map()" to inject a sequence number that i can use to group by, somehow increasing that sequence number whenever i have a -1 in the state_duration.

Below is a graph of the "state_duration" when running the flux query, i am basically trying to capture the value at the top of each peak.

enter image description here

Any help is appreciated, including doing this e.g. in InfluxQL or with Continuous Queries.

Data looks like below when exported to csv:

"time","timers.HeartbeatMs","timers.character"
"2021-01-12T14:49:34.000+01:00","2717","Tarek"
"2021-01-12T14:49:36.000+01:00","1282","Tarek"
"2021-01-12T14:49:38.000+01:00","2015","Tarek"
"2021-01-12T14:49:40.000+01:00","1984","Tarek"
"2021-01-12T14:49:42.000+01:00","2140","Tarek"
"2021-01-12T14:49:44.000+01:00","1937","Tarek"
"2021-01-12T14:49:46.000+01:00","2405","Tarek"
"2021-01-12T14:49:48.000+01:00","2312","Tarek"
"2021-01-12T14:49:50.000+01:00","1453","Tarek"
"2021-01-12T14:49:52.000+01:00","1890","Tarek"
"2021-01-12T14:49:54.000+01:00","2077","Tarek"
"2021-01-12T14:49:56.000+01:00","2250","Tarek"
"2021-01-12T14:49:59.000+01:00","2360","Tarek"
"2021-01-12T14:50:00.000+01:00","1453","Tarek"
"2021-01-12T14:50:02.000+01:00","1952","Tarek"
"2021-01-12T14:50:04.000+01:00","2108","Tarek"
"2021-01-12T14:50:06.000+01:00","2485","Tarek"
"2021-01-12T14:50:08.000+01:00","1437","Tarek"
"2021-01-12T14:50:10.000+01:00","2421","Tarek"
"2021-01-12T14:50:12.000+01:00","1483","Tarek"
"2021-01-12T14:50:14.000+01:00","2344","Tarek"
"2021-01-12T14:50:17.000+01:00","2437","Tarek"
"2021-01-12T14:50:18.000+01:00","1092","Tarek"
"2021-01-12T14:50:20.000+01:00","1969","Tarek"
"2021-01-12T14:50:22.000+01:00","2359","Tarek"
"2021-01-12T14:50:24.000+01:00","2140","Tarek"
"2021-01-12T14:50:27.000+01:00","2421","Tarek"

Solution

  • There are two ways I can think of. One is to look for the inverted state. The other is to use elapsed() to find interval + timeShift() to emulate LAG().

    I don't like the latter though I think the first is not intuitive neither :-(. Really hope features like LAG() or CurrentRecordIndex() would be available in Flux.

    from (bucket: "sampledb/autogen")
    |> range(start: -1h)
    |> filter(fn: (r) =>
      r._measurement == "timers" and
      r._field == "HeartbeatMs" and
      r.character == "Tarek"
      )
    |> stateDuration(fn: (r) => 
    
        r._value>2750,         // Look for the inverted state
    
        column: "inverted_state_duration",
        unit: 1s
      )
    |> keep(columns: ["_time","inverted_state_duration"])
    
    // Clear out records of the periods you are after
    |> filter(fn: (r) => r["inverted_state_duration"] == -1)
    
    // Calculate the gap duration with elapsed()
    |> elapsed(columnName: "state_duration")
    |> filter(fn: (r) => r["state_duration"] > ${ max(stateDuration.unit, record interval) })