Search code examples
jsonazureazure-stream-analytics

Combine individual messages to 1 output every 15 seconds


Is it possible to combine separate device messages received every 15 seconds into 1 summarized device message?

For example I want to collect "Temperature","RPM" and "Gauge" messages:

Message 1

{
   DeviceId: 1
   Key: "Temperature" 
   Value: 15
}


Message 2   
{
    DeviceId: 1,
    Key: "RPM"
    Value: 2000
}

Into:

{
  DeviceId: 1,
  Readings:
  {
     "Temperature": 15,
     "RPM": 2000 
     "Gauge": "Unknown" (no value in the 15 seconds window, so unknown)
  }
}

EDIT: Forgot to add, it is expected that if 1 of 3 messages for a device ("Temperature","RPM" and "Gauge") arrives in SA. The other 2 will be likely received within the next 15 seconds. So the SA job should start "collecting" when it sees 1 of the messages. Is a SA job suitable for this or should I look for another solution?


Solution

  • You can do this using the LAG function. Would look something like this:

    SELECT 
         DeviceId, [Key], [Value] 
         LAG(reading) OVER (PARTITION BY DeviceId LIMIT DURATION(hour, 1) WHEN reading IS NOT NULL)  
         FROM input
    

    LAG allows you to look a specified number of events and then also for a given partition key (in this case I used the DeviceId).