My stream analytics is ingesting a stream of messages of different types. Some are of type: telemetry
, others are of type: flags
. I need to calculate the average of over a moving window of some telemetry values but only consider those values that are preceded by a flag message with a value of true.
Put another way: the flag message turns on/off whether or not the telemetry is considered for calculation of the average.
I have tried the following query:
SELECT
devId,
type,
AVG ("myValue") OVER ( LIMIT DURATION (second, 30) WHEN
LAG (value) OVER (LIMIT DURATION(minute, 5) WHEN type='flag' and text='myValueFlag') = 1
) as 'myValueAvg'
MAX("ts") as 'ts'
INTO "eventhub-output"
FROM "iothub-input" TIMESTAMP BY "ts"
WHERE type = 'telemetry'
GROUP BY devId, type, SlidingWindow(second, 30)
But I get this error message:
In a query with GROUP BY, Analytic functions can not be used neither in the GROUP BY nor in SELECT, unless they are argument to an aggregate function such as in SUM(LAG(x) OVER(LIMIT DURATION(ss, 5)).
I am not sure how to proceed from here. The documentation says
LAG isn't affected by predicates in the WHERE clause, join conditions in the JOIN clause, or grouping expressions in the GROUP BY clause of the current query because it's evaluated before those clauses.
So I assumed the above query should work.
I got it to work, finally. The trick was not to use the AVG OVER
clause at all. Instead, I am using a simple AVG
over a subquery in which I filter the relevant datapoints.
WITH relevant_data AS (
SELECT
devId,
myValue,
ts
FROM "iothub-input" TIMESTAMP by "ts"
WHERE myValue IS NOT NULL
AND type = 'telemetry'
AND LAG (value,1,1) OVER ( LIMIT DURATION(minute, 5) WHEN type='flag' and text='myValueFlag') = 1
)
SELECT
devId,
AVG ( myValue ) as 'myValueAvg',
MAX("ts") as 'ts'
FROM relevant_data
GROUP BY devId, SlidingWindow(second, 30)