Search code examples
azureazure-stream-analytics

Average over some values, determined by entries from other messages


My stream analytics is ingesting a stream of messages of different types. Some are of type: telemetry, others are of type: flags. I need to calculate the average of over a moving window of some telemetry values but only consider those values that are preceded by a flag message with a value of true.

Put another way: the flag message turns on/off whether or not the telemetry is considered for calculation of the average.

I have tried the following query:

SELECT
    devId,
    type,
    AVG ("myValue") OVER ( LIMIT DURATION (second, 30) WHEN
        LAG (value) OVER (LIMIT DURATION(minute, 5) WHEN type='flag' and text='myValueFlag') = 1
    ) as 'myValueAvg'
    MAX("ts") as 'ts'
INTO "eventhub-output"
FROM "iothub-input" TIMESTAMP BY "ts"
WHERE type = 'telemetry'
GROUP BY devId, type, SlidingWindow(second, 30)

But I get this error message:

In a query with GROUP BY, Analytic functions can not be used neither in the GROUP BY nor in SELECT, unless they are argument to an aggregate function such as in SUM(LAG(x) OVER(LIMIT DURATION(ss, 5)).

I am not sure how to proceed from here. The documentation says

LAG isn't affected by predicates in the WHERE clause, join conditions in the JOIN clause, or grouping expressions in the GROUP BY clause of the current query because it's evaluated before those clauses.

So I assumed the above query should work.


Solution

  • I got it to work, finally. The trick was not to use the AVG OVER clause at all. Instead, I am using a simple AVG over a subquery in which I filter the relevant datapoints.

    WITH relevant_data AS (
        SELECT 
            devId, 
            myValue, 
            ts
        FROM "iothub-input" TIMESTAMP by "ts"
        WHERE myValue IS NOT NULL
        AND type = 'telemetry'
        AND LAG (value,1,1) OVER ( LIMIT DURATION(minute, 5) WHEN type='flag' and text='myValueFlag') = 1
    )
    
    SELECT 
        devId,
        AVG ( myValue ) as 'myValueAvg',
        MAX("ts") as 'ts'
    FROM relevant_data
    GROUP BY devId, SlidingWindow(second, 30)