Search code examples
azure-stream-analytics

Stream Analytics UDF - using the output from one UDF in another


The following code results in my GT2HP value being null in the follow on UDFs:

SELECT 
    UDF.GT2HP(Collect()) as GT2HP,
    UDF.LPLPReturns(Collect()) as LPLPReturns,
    UDF.LPGasHeater(Collect()) as LPGasHeater,
    UDF.HPRaisedSW(Collect(), AVG(GT2HP)) as HPRaisedSW,
    UDF.HPCustomerDemand(Collect(), AVG(GT2HP)) as HPCustomerDemand
INTO SQLDWUKSTEAMLOSS
FROM IotHubInput
WHERE IoTHub.ConnectionDeviceId = 'uk-iotedge'
GROUP BY TumblingWindow(second, 60)

The following code works:

SELECT 
    UDF.GT2HP(Collect()) as GT2HP,
    UDF.LPLPReturns(Collect()) as LPLPReturns,
    UDF.LPGasHeater(Collect()) as LPGasHeater,
    UDF.HPRaisedSW(Collect(), UDF.GT2HP(Collect())) as HPRaisedSW,
    UDF.HPCustomerDemand(Collect(), UDF.GT2HP(Collect())) as HPCustomerDemand
INTO SQLDWUKSTEAMLOSS
FROM IotHubInput
WHERE IoTHub.ConnectionDeviceId = 'uk-iotedge'
GROUP BY TumblingWindow(second, 60)

Obviously the second code is way more computationally expensive than the first and I'd like to avoid it if possible.

I'd like to use the output of the first UDF in my follow on UDFs, but it seems to pass on null. All the select statements appear to execute in parallel not serial, which probably explains the null.

Is there a way to use the output of one UDF in another UDF?


Solution

  • The reason that GT2HP column referenced in the AVG(GT2HP) is always null is due to the SQL semantics. Columns in the SELECT clause can only refer to sources referenced in FROM, and since there is no IotHubInput.GT2HP - it is interpreted as null.

    If you separate your query into multiple steps, as Vignesh suggested you will end up with the first step just computing the COLLECT over the 60 sec window:

    SELECT Collect() AS c
    WHERE IoTHub.ConnectionDeviceId = 'uk-iotedge'
    FROM IotHubInput
    GROUP BY TumblingWindow(second, 60)
    

    Let's name it step1. Now, since you are grouping only by a window, you will have just one value of column c every 60 sec. Any aggregation over this is not necessary unless you increase the size of the window to aggregate more than one value...

    So the AVG in the AVG(GT2HP) is unnecessary. The second step then will be:

    SELECT
        c,
        GT2HP = UDF.GT2HP(c)
    FROM step1
    

    Let's call this step step2. Now the final selection will be:

    SELECT 
        GT2HP,
        UDF.LPLPReturns(c) as LPLPReturns,
        UDF.LPGasHeater(c) as LPGasHeater,
        UDF.HPRaisedSW(c, GT2HP) as HPRaisedSW,
        UDF.HPCustomerDemand(c, GT2HP) as HPCustomerDemand
    INTO SQLDWUKSTEAMLOSS
    FROM step2
    

    And putting it all together:

    WITH step1 AS (
        SELECT Collect() AS c
        WHERE IoTHub.ConnectionDeviceId = 'uk-iotedge'
        FROM IotHubInput
        GROUP BY TumblingWindow(second, 60)
    ),
    step2 AS (
        SELECT
            c,
            GT2HP = UDF.GT2HP(c)
        FROM step1
    )
    
    SELECT 
        GT2HP,
        UDF.LPLPReturns(c) as LPLPReturns,
        UDF.LPGasHeater(c) as LPGasHeater,
        UDF.HPRaisedSW(c, GT2HP) as HPRaisedSW,
        UDF.HPCustomerDemand(c, GT2HP) as HPCustomerDemand
    INTO SQLDWUKSTEAMLOSS
    FROM step2