Search code examples
azureazure-stream-analytics

Azure Streaming Analytics stateful aggregation


I have an IoT hub with several devices that send data to Streaming analytics. The messages from devices contain information about their health in float (between 0 and 1). The streaming analytics outputs the data to service bus and I want to add information about the field that contains the average health across devices at given moment.

I wanted to use user-defined aggregates to produce this value every 10 seconds, but it looks like it uses only the last message in the timeframe.

Am I using UDA correctly and if not is there any other way to have an average across several devices or some other stateful functions?

UDA code:

function main() {
this.init = function () {
    this.state = {};
}

this.accumulate = function (value, device_id) {
    this.state[device_id] = value;
}

/*this.deaccumulate = function (value, timestamp) {
    this.state -= value;
}

this.deaccumulateState = function (otherState) {
      this.state -= otherState.state;
}*/

this.computeResult = function () {
    length = 0,
    total  = 0;
    for (var device in this.state) {
        total += this.state[device];
        length++;
    }
    return total/length;
}
}

Query:

SELECT
uda.fleetHealth(device_health_status.level, device_id) as avg_health
INTO
    bustopic2
FROM
    iotdata
GROUP BY TumblingWindow(second, 10)

Solution

  • You can only get the last message as 1 you are using map in Java script. 2 the second parameter are always same and equal to application timestamp even you define it as device_id. if you want to calculate the average level for all devices, you should do like this:

    function UDASample() {
        this.init = function () {
            this.state = 0;
            this.length = 0;
        }
    
        this.accumulate = function (value, timestamp) {
            this.state += value;
            this.length = length + 1;
        }
    
        /*this.deaccumulate = function (value, timestamp) {
            this.state -= value;
        }
    
        this.deaccumulateState = function (otherState) {
              this.state -= otherState.state;
        }*/
    
        this.computeResult = function () {
            return this.state/this.length;
        }
    }
    
    SELECT
    uda.fleetHealth(device_health_status.level) as avg_health
    INTO
        bustopic2
    FROM
        iotdata
    GROUP BY TumblingWindow(second, 10)
    

    if you want to statistic the average level for each devices, you can use same UDA above and use script like this:

    SELECT device_id,
    uda.fleetHealth(device_health_status.level) as avg_health
    INTO
        bustopic2
    FROM
        iotdata
    GROUP BY TumblingWindow(second, 10), device_id