Search code examples
azureazure-stream-analytics

Getting the time frame in which a series of message is received in stream analytics


I am streaming event messages which contain an posix/epoch time field. I am trying to calculate in which time frame i received a series of messages from a device.

Let's assume the following (simplified) input:

[
   { "deviceid":"device01", "epochtime":1500975613660 },
   { "deviceid":"device01", "epochtime":1500975640194 },
   { "deviceid":"device01", "epochtime":1500975649627 },
   { "deviceid":"device01", "epochtime":1500994473225 },
   { "deviceid":"device01", "epochtime":1500994486725 }
]

The result of my calculation should be a message like {deviceid, start, end} for each device id. I assume that a new time frame starts, if the time intervall between two events is longer than one hour. In my example this would result in two transmissions:

[
{"deviceid":"device01", "start":1500975613660, "end"=1500975649627},
{"deviceid":"device01", "start":500994473225, "end"=1500994486725}
]

I can convert the epoch time according to example 2 in the documentation https://msdn.microsoft.com/en-us/library/azure/mt573293.aspx. However, i cannot use the converted timestamp with the LAG function in a sub query. All values for previousTime are null in the ouput.

WITH step1 AS (
SELECT
   [deviceid] AS deviceId,
   System.Timestamp AS ts,
   LAG([ts]) OVER (LIMIT DURATION(hour, 24)) as previousTime
FROM 
   input TIMESTAMP BY DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z') 
)

I am not sure how i can perform my calculation and what's the best way to do it. I need to figure out the beginning and end of an event series.

Any help is very much appreciated.


Solution

  • I slightly modified your query below in order to get the expected result:

    WITH STEP1 AS (
      SELECT   
        [deviceid] AS deviceId,
        System.Timestamp AS ts,
        LAG(DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z') ) OVER (LIMIT DURATION(hour, 24)) as previousTime
      FROM 
         input TIMESTAMP BY DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z') 
    )
    SELECT * from STEP1
    

    The problem is "ts" was defined in the current step, but when using LAG you are looking at the original message coming from the FROM statement, and it doesn't contain the "ts" variable.

    Let me know if you have any question.

    Thanks,

    JS - Azure Stream Analytics team