Given a JSON stream that resembles:
{
"timestamp": "2017-01-26T20:27:26.099Z",
"Novato": {
"humidity": "40.996",
"barometric": "1011.2"
},
"Redmond": {
"humidity": "60.832",
"barometric": "1011.8"
}
}
For each City in this object, I want to add a new value called humidity_5_second_avg, which is a 5 second tumbling window average.
But of course for each city, it needs to be unique to that city. And I want to append it to the existing cities' values.
For example:
{
"timestamp": "2017-01-26T20:27:26.099Z",
"Novato": {
"humidity": "40.996",
"barometric": "1011.2",
"humidity_5_second_avg": "38.1234"
},
"Redmond": {
"humidity": "60.832",
"barometric": "1011.8",
"humidity_5_second_avg": "32.1234"
}
}
Is this possible with a Stream Analytics query? Or would I need to create two streams (one with the original data, and one with only average data, and merge them together?
This is tricky to get exactly in the way described. It’s easier to break down city information into one row per city first and then use JOIN.
-- Use CROSS APPLY to split original events into one row per city
WITH CityData AS
(
SELECT
r.PropertyName AS City,
r.PropertyValue.*
FROM localinput i TIMESTAMP BY timestamp
CROSS APPLY GetRecordProperties(i) r
WHERE r.PropertyValue.humidity IS NOT NULL
),
Averages AS
(
SELECT
City,
AVG(humidity) as avg_humidity
FROM CityData
GROUP BY city, TumblingWindow(second, 5)
)
SELECT *, System.Timestamp as ts INTO debug FROM Averages
SELECT
c.*, a.avg_humidity
FROM CityData c
JOIN Averages a
ON c.City = a.City AND DATEDIFF(second, c, a) BETWEEN 0 AND 5