Search code examples
azure-stream-analytics

process of late events in SA


I was doing a test, when I generated data that was 30 days old. When sent to SA job all that input was dropped, but per settings in event ordering blade I was expecting that all will be passed thru.

Part of job query contains:

---------------all incoming events storage query
SELECT stream.*
INTO [iot-predict-SA2-ColdStorage]
FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime

so my expectation is to have everything that was pushed to SA job in blob storage.

When I sent events that were only 5 hours old - then the input was marked as late (expected) and processed. Per SS first marked area is showing outdated events input, but no output (red), the second part shows late processed events.

full query

    WITH AlertsBasedOnMin
    AS (
        SELECT stream.SensorGuid
            ,stream.Value
            ,stream.SensorName
            ,ref.AggregationTypeFlag
            ,ref.MinThreshold AS threshold
            ,ref.Count
            ,CASE 
                WHEN (ref.MinThreshold > stream.Value)
                    THEN 1
                ELSE 0
                END AS isAlert
        FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
        JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
        WHERE ref.AggregationTypeFlag = 8
        )
        ,AlertsBasedOnMax
    AS (
        SELECT stream.SensorGuid
            ,stream.Value
            ,stream.SensorName
            ,ref.AggregationTypeFlag
            ,ref.MaxThreshold AS threshold
            ,ref.Count
            ,CASE 
                WHEN (ref.MaxThreshold < stream.Value)
                    THEN 1
                ELSE 0
                END AS isAlert
        FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
        JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
        WHERE ref.AggregationTypeFlag = 16
        )
        ,alertMinMaxUnion
    AS (
        SELECT *
        FROM AlertsBasedOnMin

        UNION ALL

        SELECT *
        FROM AlertsBasedOnMax
        )
        ,alertMimMaxComputed
    AS (
        SELECT SUM(alertMinMaxUnion.isAlert) AS EventCount
            ,alertMinMaxUnion.SensorGuid AS SensorGuid
            ,alertMinMaxUnion.SensorName
        FROM alertMinMaxUnion
        GROUP BY HoppingWindow(Duration(minute, 1), Hop(second, 30))
            ,alertMinMaxUnion.SensorGuid
            ,alertMinMaxUnion.Count
            ,alertMinMaxUnion.AggregationTypeFlag
            ,alertMinMaxUnion.SensorName
        HAVING SUM(alertMinMaxUnion.isAlert) > alertMinMaxUnion.Count
        )
        ,alertsMimMaxComputedMergedWithReference
    AS (
        SELECT System.TIMESTAMP [TimeStampUtc]
            ,computed.EventCount
            ,0 AS SumValue
            ,0 AS AvgValue
            ,0 AS StdDevValue
            ,computed.SensorGuid
            ,computed.SensorName
            ,ref.MinThreshold
            ,ref.MaxThreshold
            ,ref.TimeFrameInSeconds
            ,ref.Count
            ,ref.GatewayGuid
            ,ref.SensorType
            ,ref.AggregationType
            ,ref.AggregationTypeFlag
            ,ref.EmailList
            ,ref.PhoneNumberList
        FROM alertMimMaxComputed computed
        JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = computed.SensorGuid
        )
        ,alertsAggregatedByFunction
    AS (
        SELECT Count(1) AS eventCount
            ,stream.SensorGuid AS SensorGuid
            ,stream.SensorName
            ,ref.[Count] AS TriggerThreshold
            ,SUM(stream.Value) AS SumValue
            ,AVG(stream.Value) AS AvgValue
            ,STDEV(stream.Value) AS StdDevValue
            ,ref.AggregationTypeFlag AS flag
        FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime
        JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = stream.SensorGuid
        GROUP BY HoppingWindow(Duration(minute, 1), Hop(second, 30))
            ,ref.AggregationTypeFlag
            ,ref.[Count]
            ,ref.MaxThreshold
            ,ref.MinThreshold
            ,stream.SensorGuid
            ,stream.SensorName
        HAVING
            --as this is alert then this factor will be relevant to all of the aggregated queries
            Count(1) >= ref.[Count]
            AND (
                --average
                (
                    ref.AggregationTypeFlag = 1
                    AND (
                        AVG(stream.Value) >= ref.MaxThreshold
                        OR AVG(stream.Value) <= ref.MinThreshold
                        )
                    )
                --sum
                OR (
                    ref.AggregationTypeFlag = 2
                    AND (
                        SUM(stream.Value) >= ref.MaxThreshold
                        OR Sum(stream.Value) <= ref.MinThreshold
                        )
                    )
                --stdev 
                OR (
                    ref.AggregationTypeFlag = 4
                    AND (
                        STDEV(stream.Value) >= ref.MaxThreshold
                        OR STDEV(stream.Value) <= ref.MinThreshold
                        )
                    )
                )
        )
        ,alertsAggregatedByFunctionMergedWithReference
    AS (
        SELECT System.TIMESTAMP [TimeStampUtc]
            ,0 AS EventCount
            ,computed.SumValue
            ,computed.AvgValue
            ,computed.StdDevValue
            ,computed.SensorGuid
            ,computed.SensorName
            ,ref.MinThreshold
            ,ref.MaxThreshold
            ,ref.TimeFrameInSeconds
            ,ref.Count
            ,ref.GatewayGuid
            ,ref.SensorType
            ,ref.AggregationType
            ,ref.AggregationTypeFlag
            ,ref.EmailList
            ,ref.PhoneNumberList
        FROM alertsAggregatedByFunction computed
        JOIN [iot-predict-SA2-referenceBlob] ref ON ref.SensorGuid = computed.SensorGuid
        )
        ,allAlertsUnioned
    AS (
        SELECT *
        FROM alertsAggregatedByFunctionMergedWithReference

        UNION ALL

        SELECT *
        FROM alertsMimMaxComputedMergedWithReference
        )


    ---------------alerts storage query 
    SELECT *
    INTO [iot-predict-SA2-Alerts-ColdStorage]
    FROM allAlertsUnioned

    ---------------alerts to alert events query
    SELECT *
    INTO [iot-predict-SA2-Alerts-EventStream]
    FROM allAlertsUnioned


    ---------------alerts to stream query
    SELECT *
    INTO [iot-predict-SA2-TSI-EventStream]
    FROM allAlertsUnioned


    ---------------all incoming events storage query
    SELECT stream.*
    INTO [iot-predict-SA2-ColdStorage]
    FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime


    ---------------all incoming events to time insights query
    SELECT stream.*
    INTO [iot-predict-SA2-TSI-AlertStream]
    FROM [iot-predict-SA2-input] stream TIMESTAMP BY stream.UtcTime

Sa output


Solution

  • After a chat with guys from MS, it emerged that my test have to had an extra step to perform.

    To have late events processed, regardless late event settings, we need to start this job in a way, that late event is considered as a sent when job was started, so in this particular case, we have to start SA job using custom start date and set it 30 days ago.