Search code examples
azure-stream-analytics

How to inject (append) a windowed average into the output using MS Stream Analytics


Given a JSON stream that resembles:

{ "timestamp": "2017-01-26T20:27:26.099Z", "Novato": { "humidity": "40.996", "barometric": "1011.2" }, "Redmond": { "humidity": "60.832", "barometric": "1011.8" } }

For each City in this object, I want to add a new value called humidity_5_second_avg, which is a 5 second tumbling window average.

But of course for each city, it needs to be unique to that city. And I want to append it to the existing cities' values.

For example:

{ "timestamp": "2017-01-26T20:27:26.099Z", "Novato": { "humidity": "40.996", "barometric": "1011.2", "humidity_5_second_avg": "38.1234" }, "Redmond": { "humidity": "60.832", "barometric": "1011.8", "humidity_5_second_avg": "32.1234" } }

Is this possible with a Stream Analytics query? Or would I need to create two streams (one with the original data, and one with only average data, and merge them together?


Solution

  • This is tricky to get exactly in the way described. It’s easier to break down city information into one row per city first and then use JOIN.

    -- Use CROSS APPLY to split original events into one row per city
    WITH CityData AS
    (
        SELECT 
            r.PropertyName AS City,
            r.PropertyValue.*
        FROM localinput i TIMESTAMP BY timestamp 
        CROSS APPLY GetRecordProperties(i) r
        WHERE r.PropertyValue.humidity IS NOT NULL
    ),
    Averages AS
    (
        SELECT 
            City,
            AVG(humidity) as avg_humidity 
        FROM CityData
        GROUP BY city, TumblingWindow(second, 5)
    )
    
    SELECT *, System.Timestamp as ts INTO debug FROM Averages
    
    SELECT 
        c.*, a.avg_humidity
    FROM CityData c
    JOIN Averages a
    ON c.City = a.City AND DATEDIFF(second, c, a) BETWEEN 0 AND 5