Search code examples
influxdbinfluxdb-2

Influxdb Flux query with custom window aggregate function


Could you please help me with the InfluxDB 2 Flux query syntax to build a windowed query with a custom aggregate function.

I went through the online docs, but they seem to be lacking examples on how to get to the actual window content (first, last records) from within the custom aggregate function. It also doesn't immediately describe the expected signature of the custom functions.

I'd like to build a query with a sliding window that would produce a difference between the first and the last value in the window. Something along these lines:

difference = (column, tables=<-) => ({ tables.last() - tables.first() })

from(bucket: "my-bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "simple")
  |> filter(fn: (r) => r["_field"] == "value")
  |> aggregateWindow(every: 1mo, fn: difference, column: "_value", timeSrc: "_stop", timeDst: "_time", createEmpty: true)
  |> yield(name: "diff")

The syntax of the above example is obviously wrong, but hopefully you can understand, what I'm trying to do.

Thank you!


Solution

  • Came up with the following. It works at least syntactically:

    from(bucket: "my-bucket")
      |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
      |> filter(fn: (r) => r["_measurement"] == "simple")
      |> filter(fn: (r) => r["_field"] == "value")
      |> aggregateWindow(
          every: 1mo, 
          fn: (column, tables=<-) => tables |> reduce(
                identity: {first: -1.0, last: -1.0, diff: -1.0},
                fn: (r, acc) => ({
                    first:
                        if acc.first < 0.0 then r._value
                        else acc.first,
                    last:
                        r._value,
                    diff:
                        if acc.first < 0.0 then 0.0
                        else (acc.last - acc.first)
                })
              )
              |> drop(columns: ["first", "last"])
              |> set(key: "_field", value: column)
              |> rename(columns: {diff: "_value"})
          )
      |> yield(name: "diff")
    

    The window is not really sliding though.

    The same for the sliding window:

    from(bucket: "my-bucket")
      |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
      |> filter(fn: (r) => r["_measurement"] == "simple")
      |> filter(fn: (r) => r["_field"] == "value")
      |> window(every: 1h, period: 1mo)
      |> reduce(
        identity: {first: -1.0, last: -1.0, diff: -1.0},
        fn: (r, acc) => ({
            first:
                if acc.first < 0.0 then r._value
                else acc.first,
            last:
                r._value,
            diff:
                if acc.first < 0.0 then 0.0
                else (acc.last - acc.first)
        })
      )
      |> duplicate(column: "_stop", as: "_time")
      |> drop(columns: ["first", "last"])
      |> rename(columns: {diff: "_value"})
      |> window(every: inf)