Search code examples
azure-functionsazure-iot-hubazure-stream-analyticsazure-durable-functionsazure-digital-twins

Durable Entity function doesn't trigger or receive data from streaming analytics


The durable entity function I've setup as output for a streaming analytics job doesn't trigger or receive any data. I can see incoming input data in the stream analytics job and it outputs to store the data correctly (for a stream analytics job storage output test I also setup). If I test the CalculatePositioni function with postman the function and entity receives the data and updates the state correctly.

I also get a binding error for the function app and I can't see what's missing:

Stream Analytics Job output with my duration funcapp setup: enter image description here

I selected keyname default at output creation. I note don't see anywhere or local.settings.json in the function app where I'd need to enter the hashed key.

enter image description here

My function app.

namespace BladeFunctionApps
{

public static class TransformAlgorithm
{
    public static async Task Process_Algorithm([EntityTrigger] IDurableEntityContext context, ILogger log)
    {
        //Set or Get intermediate calculation state values
        Algorithm_SaveStateVariables intermediate_calc_results;
        if (context.HasState)
        {
            intermediate_calc_results = context.GetState<Algorithm_SaveStateVariables>();
            log.LogInformation("Success getting state values: CumItems:" + intermediate_calc_results.AverageMagnetometerX + 
                " Avg:"+ intermediate_calc_results.AverageMagnetometerX);
        }
        else
        {
            log.LogError("Failed to get state values.");
            intermediate_calc_results = new Algorithm_SaveStateVariables();
        }

        //Get values from incoming Json object and perform calculations
        JObject deviceBatchMessage = (JObject)JsonConvert.DeserializeObject(context.GetInput<string>());

        Random rnd = new Random();
        int incomingDataLines = rnd.Next(1, 10);
        intermediate_calc_results.DataItemsNumber=incomingDataLines++;

        bool parsebool=float.TryParse(deviceBatchMessage["body"]["magnetX"].ToString(), out float tempMagnX);
        if(parsebool)
            intermediate_calc_results.AverageMagnetometerX+= tempMagnX;
        else
        {
            log.LogError("Could not parse to float: " + deviceBatchMessage["body"]["magnetX"].ToString());
            intermediate_calc_results.AverageMagnetometerX += 0;
        }

        context.SetState(intermediate_calc_results);
    }

    [FunctionName("CalculatePosition")]
    public static async Task<ActionResult> CalculateTransform(
        [HttpTrigger(AuthorizationLevel.Function, "get", Route = null)] HttpRequest req,
        [DurableClient] IDurableEntityClient client, ILogger log)
    {
        //read data
        // Extract the body from the request
        log.LogInformation("I'm at the top of CalculatePosition");
        string jsonContent = await new StreamReader(req.Body).ReadToEndAsync();
        if (string.IsNullOrEmpty(jsonContent))
        {
            log.LogInformation("CalculatePosition: Received Data null or empty.");
            return new StatusCodeResult(204); // 204, ASA connectivity check
        }
        else
            log.LogInformation("CalculatePosition: Received Data:" + jsonContent);

        //send data to algo for processing
        var entityId = new EntityId(nameof(Process_Algorithm), "ProcessTransformAlgorithm");
        await client.SignalEntityAsync(entityId,"FullOperation",jsonContent);

        return (ActionResult)new OkObjectResult($"CalculatePosition: CalculatePosition Success");
    }
}
}

enter image description here

What binding is missing? enter image description here

enter image description here

The input/output setup with Query in the Stream Analytics Job should work, I think so because it's the same setup that I use to output into a blob and works with the blob, not with Azure Steam analytics.

enter image description here

enter image description here

enter image description here


Solution

  • You need to run a query on your Azure stream analytics job to make the data flow into your Azure function. Before you run the query, you would have to set the output alias name when you bind the Azure function as output to the Azure stream analytics job.

    Here is where you can provide the alias name enter image description here

    Once you have the properties defined, you can create a query to push data into the output alias and onto the Azure function. Here is a sample query showing the same. enter image description here

    The query varies based on the data or fields you have in the stream analytics job. Please refer to this tutorial Update the Stream Analytics job with the function as output for more detailed information. Here is another example that would guide you with the binding and the configuration process.

    Here is an HTTPTrigger function I have created to test the data flow between Azure Stream Analytics job and Azure function. The behavior would be the same for durable function as it also uses HTTPTrigger.

    public static class StreamAnalyticsFunction
    {
        [FunctionName("StreamAnalyticsFunction")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
    
            string name = req.Query["name"];
    
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            dynamic data = JsonConvert.DeserializeObject(requestBody);
            name = name ?? data?.name;
    
            string responseMessage = string.IsNullOrEmpty(name)
                ? "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response."
                : $"Hello, {name}. This HTTP triggered function executed successfully.";
    
            return new OkObjectResult(responseMessage);
        }
    }
    

    Here is my function integration and data flow

    enter image description here enter image description here enter image description here