Search code examples
influxdbinfluxdb-2flux-influxdb

Dynamic Filtering using variable in filter function in Flux


Using the quantile function, I was able to get 95 % percentile value in a stream.

Now, i want to filter records which lie below the 95% percentile. hence, I loop over my recods and filter records which lie below the percentile. However, at this topic I get error –

Please find code below –

percentile = totalTimeByDoc
  |> filter(fn: (r) => r["documentType"] == "PurchaseOrder")
  
  |> group(columns:["documentType"])
//   |> yield()
  |> quantile(column: "processTime", q: 0.95, method: "estimate_tdigest", compression: 9999.0)
  |> limit(n: 1)
  |> rename(columns: {processTime: "pt"})

Gives me data – >

0 PurchaseOrder 999

Now, I try to loop over my records and filter -

 percentile_filered = totalTimeByDoc
  |> filter(fn: (r) => r["documentType"] == "PurchaseOrder")
   |> filter(fn: (r) => r.processTime < percentile[0]["pt"])
     |> yield()

Where, totalTimeByDoc is like below –

|0|PurchaseOrder|testpass22PID230207222747-1|1200|

|1|PurchaseOrder|testpass22PID230207222747-2|807|
|2|PurchaseOrder|testpass22PID230207222934-1|671|
|3|PurchaseOrder|testpass22PID230207222934-2|670|

I get following error from above query –

 error @116:41-116:51: expected [{A with pt: B}] (array) but found stream[{A with pt: B}]

Solution

  • You are only missing column extraction from percentile stream. Have a look at Extract scalar values. In this very case, you could do

    percentile = totalTimeByDoc
      |> ...
      |> rename(columns: {processTime: "pt"})
      |> findColumn(fn: (key) => true, column: "pt")
    
    percentile_filtered = totalTimeByDoc
      |> filter(fn: (r) => r["documentType"] == "PurchaseOrder")
      |> filter(fn: (r) => r.processTime < percentile[0])
      |> yield()