I'm currently setting up a job that forwards IoT Device based Event Hub data (JSON payloads) for multiple devices to a database.
The job forwards to an SQL table right now which is great (easy part), however I'm only interested in forwarding the data via Stream Analytics in the event that the payload data has changed since the last sample received from that specific device via Stream Analytics.
Obviously performing this within the database is inefficient and should be handled within the data processing logic prior to it hitting SQL.
In the JSON payload (example below) we have a timestamp value which needs to be ignored within this comparison as primary focus is on determining whether the 6 channel values (INT) have changed since the last sample.
{
"pid": 51387408,
"device_id": "WRL11111111",
"channels": {
"timestamp": 1713019679,
"chd1_value": 0,
"chd2_value": 0,
"chd3_value": 0,
"chd4_value": 0,
"chd5_value": 0,
"chd6_value": 0
}
}
I have tried multiple iterations of Stream Analytics Queries that attempts to perform this comparison using the LAG function however examples found online don't focus on device specific comparisons.
Queries such as the following throws errors such as : 'LAG' does not allow an 'Order By' clause.
WITH PreviousEvent AS (
SELECT
device_id,
device_data,
LAG(device_data) OVER (PARTITION BY device_id ORDER BY EventEnqueuedUtcTime) AS previous_data
FROM
device_stream
)
SELECT
device_id,
device_data,
previous_data,
CASE WHEN device_data != previous_data THEN 'Value changed' ELSE 'Value unchanged' END AS change_status
FROM
PreviousEvent
I'm sure this is a frequent requirement for Stream Analytics queries and am surprised i can't find any concrete examples that provide clarity into how this is achieved, any advise or guidance appreciated.
In Stream Analytics, since it does not support an ORDER BY
clause directly at the query level, refer to this SO.there is no support ORDER BY
in LAG
Use LAG in Azure Stream Analytics.
use CollectTop function in Azure Stream Analytics
WITH PreviousEvent AS (
SELECT
IoTHub.ConnectionDeviceId AS device_id,
CurrentTemperature AS current_temperature,
PreviousTemperature AS previous_temperature,
EventEnqueuedUtcTime,
-- CollectTop to get the previous event based on device_id and EventEnqueuedUtcTime
CollectTop(1) OVER (
PARTITION BY IoTHub.ConnectionDeviceId
ORDER BY EventEnqueuedUtcTime ASC
LIMIT DURATION(hour, 1)
) AS previous_event
FROM
[certificate122ravi89]
TIMESTAMP BY EventEnqueuedUtcTime
)
SELECT
pe.device_id AS DeviceId,
pe.current_temperature AS CurrentTemperature,
-- Extract the previous temperature from the nested record in CollectTop
pe.Previous_temperature AS PreviousTemperature,
-- Determine whether the temperature changed
CASE
WHEN pe.current_temperature != pe.previous_event.Value THEN 'Value changed'
ELSE 'Value unchanged'
END AS change_status
FROM
PreviousEvent pe
These are the steps to send data from Azure IoT Hub to SQL Server using a Stream Analytics job:
I have Sent sample data to the IoT Hub using this sample code. Replace with the sample data you want to send.
The code below sends data from a device to an IoT Hub using the Azure IoT Python SDK. The data being sent includes an energy type (randomly chosen from "Gas," "Water," or "Electricity") and a value (a random float between 0 and 100).
from azure.iot.device import IoTHubDeviceClient, Message
import json
import random
import time
# Connection string for the device
CONNECTION_STRING = ""
def send_data(device_client):
# Sample data
data = {
"energyType": random.choice(["Gas", "Water", "Electricity"]),
"value": random.uniform(0, 100)
}
# Convert data to JSON and create a message
message = Message(json.dumps(data))
# Send the message to IoT Hub
print("Sending message:", message)
device_client.send_message(message)
print("Message sent successfully")
if __name__ == "__main__":
try:
# Create a device client
client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
while True:
# Send data periodically
send_data(client)
time.sleep(5) # Adjust the interval as needed
except KeyboardInterrupt:
print("IoTHubClient sample stopped")
Create an SQL database, SQL server, and table for the above data.
CREATE TABLE energy_data (
EventId INT PRIMARY KEY IDENTITY(1,1), -- Unique ID for each event
EnergyType VARCHAR(20), -- Type of energy (e.g., Gas, Water, Electricity)
Value FLOAT, -- Value of the energy type
EventProcessedUtcTime DATETIME, -- Time when the event was processed in UTC
PartitionId INT, -- Partition ID for the event
EventEnqueuedUtcTime DATETIME, -- Time when the event was enqueued in UTC
ConnectionDeviceId VARCHAR(50), -- Connection device ID
ConnectionDeviceGenerationId VARCHAR(50), -- Connection device generation ID
EnqueuedTime DATETIME -- Time when the event was enqueued in UTC
);
Select Outputs as your SQL database and input as your IoT Hub input. Select Partition key as 0 in Stream Analytics .
SELECT
energyType AS EnergyType,
value AS Value,
EventProcessedUtcTime,
PartitionId,
EventEnqueuedUtcTime,
IoTHub.ConnectionDeviceId AS ConnectionDeviceId,
IoTHub.ConnectionDeviceGenerationId AS ConnectionDeviceGenerationId,
IoTHub.EnqueuedTime AS EnqueuedTime
INTO [sql]
FROM [IoTHub];
Run the job and go to the SQL data and run the following SQL query to see the data.
SELECT * FROM [dbo].[energy_data]