Search code examples
databaseinfluxdbflux

InfluxDB: Merge Time Series Columns and multiply them


I try to join two tables: currents and voltages Both are measured at the same time, with just some ms of delay. When I try to merge their Columns using union, I get a lot of nulls. enter image description here

My goal is to multiply both columns, in order to calculate the power. But therefore they must be aligned. How can I do that and multiply them afterwards using Flux.

voltages = from(bucket: "data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "power_supply")
  |> filter(fn: (r) => r["device"] == "multi_1")
  |> filter(fn: (r) => r["_field"] == "ch1_measured_voltage_(V)")
  |> aggregateWindow(every: v.windowPeriod, fn: mean)
  |> duplicate(column: "_value", as: "voltage")
  |> keep(columns: ["_time", "voltage"])
  |> yield(name: "voltage")

currents = from(bucket: "data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "power_supply")
  |> filter(fn: (r) => r["device"] == "multi_1")
  |> filter(fn: (r) => r["_field"] == "ch1_measured_current_(A)")
  |> aggregateWindow(every: v.windowPeriod, fn: mean)
  |> duplicate(column: "_value", as: "current")
  |> keep(columns: ["_time", "current"])
  |> yield(name: "current")


comb = union(tables: [voltages, currents])
 |> sort(columns: ["_time"], desc: false)
 |> yield(name: "mean")

Solution

  • You can use pivot to do this, and remember to sort after use pivot:

    from(bucket: "data")
      |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
      |> filter(fn: (r) => r["_measurement"] == "power_supply")
      |> filter(fn: (r) => r["device"] == "multi_1")
      |> filter(fn: (r) => r["_field"] == "ch1_measured_voltage_(V)" or r["_field"] == "ch1_measured_current_(A)")
      |> aggregateWindow(every: v.windowPeriod, fn: mean)
      |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
      |> sort(columns: ["_time"])
      |> map(fn: (r) => ({r with _value: r["ch1_measured_voltage_(V)"] * r["ch1_measured_current_(A)"]}))
      |> yield(name: "mean")