Search code examples
azure-iot-hubazure-stream-analyticsazure-sql-managed-instance

Stream Analytics - Output Device Messages Only Upon Change


I'm currently setting up a job that forwards IoT Device based Event Hub data (JSON payloads) for multiple devices to a database.

The job forwards to an SQL table right now which is great (easy part), however I'm only interested in forwarding the data via Stream Analytics in the event that the payload data has changed since the last sample received from that specific device via Stream Analytics.

Obviously performing this within the database is inefficient and should be handled within the data processing logic prior to it hitting SQL.

In the JSON payload (example below) we have a timestamp value which needs to be ignored within this comparison as primary focus is on determining whether the 6 channel values (INT) have changed since the last sample.

  {
    "pid": 51387408,
    "device_id": "WRL11111111",
    "channels": {
      "timestamp": 1713019679,
      "chd1_value": 0,
      "chd2_value": 0,
      "chd3_value": 0,
      "chd4_value": 0,
      "chd5_value": 0,
      "chd6_value": 0
    }
  }

I have tried multiple iterations of Stream Analytics Queries that attempts to perform this comparison using the LAG function however examples found online don't focus on device specific comparisons.

Queries such as the following throws errors such as : 'LAG' does not allow an 'Order By' clause.

WITH PreviousEvent AS (
    SELECT 
        device_id,
        device_data,
        LAG(device_data) OVER (PARTITION BY device_id ORDER BY EventEnqueuedUtcTime) AS previous_data
    FROM 
        device_stream
)
SELECT 
    device_id,
    device_data,
    previous_data,
    CASE WHEN device_data != previous_data THEN 'Value changed' ELSE 'Value unchanged' END AS change_status
FROM 
    PreviousEvent

I'm sure this is a frequent requirement for Stream Analytics queries and am surprised i can't find any concrete examples that provide clarity into how this is achieved, any advise or guidance appreciated.


Solution

    • In Stream Analytics, since it does not support an ORDER BY clause directly at the query level, refer to this SO.there is no support ORDER BY in LAG

    • Use LAG in Azure Stream Analytics.

    • use CollectTop function in Azure Stream Analytics

    WITH PreviousEvent AS  (
    
    SELECT
    
    IoTHub.ConnectionDeviceId AS device_id,
    
    CurrentTemperature AS current_temperature,
    
    PreviousTemperature AS previous_temperature,
    
    EventEnqueuedUtcTime,
    
    -- CollectTop to get the previous event based on device_id and EventEnqueuedUtcTime
    
    CollectTop(1)  OVER  (
    
    PARTITION  BY IoTHub.ConnectionDeviceId
    
    ORDER  BY EventEnqueuedUtcTime ASC
    
    LIMIT  DURATION(hour, 1)
    
    )  AS previous_event
    
    FROM
    
    [certificate122ravi89]
    
    TIMESTAMP  BY EventEnqueuedUtcTime
    
    )
    
    SELECT
    
    pe.device_id AS DeviceId,
    
    pe.current_temperature AS CurrentTemperature,
    
    -- Extract the previous temperature from the nested record in CollectTop
    
    pe.Previous_temperature AS PreviousTemperature,
    
    -- Determine whether the temperature changed
    
    CASE
    
    WHEN pe.current_temperature != pe.previous_event.Value THEN  'Value changed'
    
    ELSE  'Value unchanged'
    
    END  AS change_status
    
    FROM
    
    PreviousEvent pe
    

    enter image description here

    These are the steps to send data from Azure IoT Hub to SQL Server using a Stream Analytics job:

    • Create a Stream Analytics job.
    • Create an Azure IoT Hub and device.

    I have Sent sample data to the IoT Hub using this sample code. Replace with the sample data you want to send.

    The code below sends data from a device to an IoT Hub using the Azure IoT Python SDK. The data being sent includes an energy type (randomly chosen from "Gas," "Water," or "Electricity") and a value (a random float between 0 and 100).

    from azure.iot.device import IoTHubDeviceClient, Message
    import json
    import random
    import time
    
    # Connection string for the device
    CONNECTION_STRING = ""
    
    def send_data(device_client):
        # Sample data
        data = {
            "energyType": random.choice(["Gas", "Water", "Electricity"]),
            "value": random.uniform(0, 100)
        }
    
        # Convert data to JSON and create a message
        message = Message(json.dumps(data))
    
        # Send the message to IoT Hub
        print("Sending message:", message)
        device_client.send_message(message)
        print("Message sent successfully")
    
    if __name__ == "__main__":
        try:
            # Create a device client
            client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
    
            while True:
                # Send data periodically
                send_data(client)
                time.sleep(5)  # Adjust the interval as needed
    
        except KeyboardInterrupt:
            print("IoTHubClient sample stopped")
    

    Create an SQL database, SQL server, and table for the above data.

    CREATE TABLE energy_data (
    EventId INT PRIMARY KEY IDENTITY(1,1), -- Unique ID for each event
    EnergyType VARCHAR(20), -- Type of energy (e.g., Gas, Water, Electricity)
    Value FLOAT, -- Value of the energy type
    EventProcessedUtcTime DATETIME, -- Time when the event was processed in UTC
    PartitionId INT, -- Partition ID for the event
    EventEnqueuedUtcTime DATETIME, -- Time when the event was enqueued in UTC
    ConnectionDeviceId VARCHAR(50), -- Connection device ID
    ConnectionDeviceGenerationId VARCHAR(50), -- Connection device generation ID
    EnqueuedTime DATETIME -- Time when the event was enqueued in UTC
    );
    

    Select Outputs as your SQL database and input as your IoT Hub input. Select Partition key as 0 in Stream Analytics .

    SELECT
    energyType AS EnergyType,
    value AS Value,
    EventProcessedUtcTime,
    PartitionId,
    EventEnqueuedUtcTime,
    IoTHub.ConnectionDeviceId AS ConnectionDeviceId,
    IoTHub.ConnectionDeviceGenerationId AS ConnectionDeviceGenerationId,
    IoTHub.EnqueuedTime AS EnqueuedTime
    INTO [sql]
    FROM [IoTHub];
    

    Run the job and go to the SQL data and run the following SQL query to see the data.

    SELECT * FROM [dbo].[energy_data]

    SQL Data