Search code examples
azureazure-stream-analyticsstream-analytics

cross apply an array of values recorded every 10 mins from a timestamp and generate their timestamps in stream analytics


I have the following stream analytics input:

{ "ID":"DEV-001-Test",
  "TMSMUTC":"2021-10-14T14:00:00.000",
  "MSGTYP":"TELEMETRY",
  "THING":[
           {
            "TMSDUTC":"2021-10-14T13:00:00.000",
            "DATA":[
                {
                  "TAGID":"TAGB",
                  "VALUE":30
                },
                {
                  "TAGID":"TAGX",
                  "VALUE":[30.34,245.65,30.34,245.65,245.65,30.34]
                }
               ]
           }
          ]
}

in which the array of values for the "TAGX" is representing a value recorded from a sensor every 10 mins for one hour from the timestamp "TMSDUTC":"2021-10-14T13:00:00.000". I was wondering how could make a query that would give me a similar output:

output

my main doubts are how to create the sequence of 10 mins from the timestamp and cross apply the values to it.


Solution

  • That's a good one! Note that I highly recommend you use VSCode and the ASA extension when working on these queries. The developer experience is much nicer than in the portal thanks to local testing, and you can also unit test your query via the npm package.

    I took the following assumptions:

    • THING is an array of a single record. Let me know if that's not the case
    • [edited] TMSDUTC needs to be incremented by 10 minutes according to the position of each item in the array when applicable (TAGX)

    With that, here is the query. It's split in multiple code blocks to explain the flow, but I also pasted it whole in the last code block.

    First we bring all the required fields to the first level. It makes things easier to read, but not only. GetArrayElements needs an array to CROSS APPLY, but GetArrayElement (singular) doesn't return the type at compile time. Using an intermediary query step solves that.

    WITH things AS (
        SELECT
            ID,
            GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
            MSGTYP AS MessageType,
            GetArrayElement(THING,0).DATA AS DATA
        FROM [input]
    ),
    

    Then we expand DATA:

    dataAll AS (
        SELECT
            T.ID,
            T.TMSDUTC,
            T.MessageType,
            D.ArrayValue.TAGID AS Tag,
            D.ArrayValue.Value AS [Value]
        FROM things T
        CROSS APPLY GetArrayElements(T.DATA) AS D
    ),
    

    Then we create a subset for records that have a VALUE of type array (TAGX in your example). Here I avoid hard-coding per tag by detecting the type at runtime. These records will need another round of array processing in the following step.

    dataArrays AS (
        SELECT
            A.ID,
            A.TMSDUTC,
            A.MessageType,
            A.Tag,
            A.[Value]
        FROM dataAll A
        WHERE GetType(A.[Value]) = 'array'
    ),
    

    Now we can focus on expanding VALUE for those records. Note that we could not do that in a single pass (filter on arrays above and CROSS APPLY below), as GetArrayElements checks types before filtering is done.

    [edited] To increment TMSDUTC, we use DATEADD on the index of each item in its array (ArrayIndex/ArrayValue are both returned from the array expansion, see doc below).

    dataArraysExpanded AS (
        SELECT
            A.ID,
            DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
            A.MessageType,
            A.Tag,
            V.ArrayValue AS [Value]
        FROM dataArrays A
        CROSS APPLY GetArrayElements(A.[Value]) AS V
    ),
    

    We union back everything together:

    newSchema AS (
        SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
            UNION
        SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
    )
    

    And finally insert everything into the destination:

    SELECT
        *
    INTO myOutput
    FROM newSchema
    

    [edited] Please note that the only order guaranteed on a result set is the one defined by the timestamp. If multiple records occur on the same timestamp, no order is guaranteed by default. Here, at the end of the query, all of the newly created events are still timestamped on the timestamp of the original event. If you now need to apply time logic on the newly generated TMSDUTC, you will need to output these records to Event Hub, and load them in another job using TIMESTAMP BY TMSDUTC. Currently the timestamp can only be changed directly at the very first step of a query.

    What is used here :

    • GetArrayElement (singular) : doc
    • WITH aka Common Table Expression (CTE) : doc
    • CROSS APPLY + GetArrayElements : doc and doc, plus very good ref
    • GetType : doc

    The entire thing for easier copy/pasting:

    WITH things AS (
        SELECT
            ID,
            GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
            MSGTYP AS MessageType,
            GetArrayElement(THING,0).DATA AS DATA
        FROM [input]
    ),
    dataAll AS (
        SELECT
            T.ID,
            T.TMSDUTC,
            T.MessageType,
            D.ArrayValue.TAGID AS Tag,
            D.ArrayValue.Value AS [Value]
        FROM things T
        CROSS APPLY GetArrayElements(T.DATA) AS D
    ),
    dataArrays AS (
        SELECT
            A.ID,
            A.TMSDUTC,
            A.MessageType,
            A.Tag,
            A.[Value]
        FROM dataAll A
        WHERE GetType(A.[Value]) = 'array'
    ),
    dataArraysExpanded AS (
        SELECT
            A.ID,
            DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
            A.MessageType,
            A.Tag,
            V.ArrayValue AS [Value]
        FROM dataArrays A
        CROSS APPLY GetArrayElements(A.[Value]) AS V
    ),
    newSchema AS (
        SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
            UNION
        SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
    )
    SELECT
        *
    INTO myOutput
    FROM newSchema