Search code examples
azureazure-storageazure-iot-hubazure-stream-analytics

Can I add DeviceID in path while storing data from Azure Stream Analytics to Blob storage


I have Data incoming from Different devices to IoT hub from there using Stream Analytics to process it and store it in blob storage. I know we can add {date}{time} we add in the path according to needed format, in that path can we add deviceId too.

example : For 2018/10/30/01 ( Date/month/day/hour) Can add /deviceId in that path while storing to blob enter image description here


Solution

  • The following is an example of workaround for your case. It's based on the using an azure function (HttpTrigger) for output ASA job to append a data to the specific blob storage in the push manner. Note, that the following workaround using the Max batch count for delivering events to the azure function value 1 (one telemetry data at the time).

    ASA job query:

    SELECT
      System.Timestamp as [time], * 
    INTO outAF
    FROM 
      iot TIMESTAMP BY time
    

    Azure Function (HttpTrigger):

    run.csx

    #r "Newtonsoft.Json"
    #r "Microsoft.WindowsAzure.Storage"
    
    using System.Net;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Extensions.Primitives;
    using Microsoft.WindowsAzure.Storage.Blob;
    using Newtonsoft.Json;
    using Newtonsoft.Json.Linq;
    
    public static async Task<IActionResult> Run(string body, CloudBlobContainer blobContainer, ILogger log)
    {
        log.LogInformation($"{body}");
    
        var jtoken = JToken.Parse(body);
        var jobject = jtoken is JArray ? jtoken.SingleOrDefault<JToken>() : jtoken;
        if(jobject != null)
        {
            var jtext = jobject.ToString(Formatting.None);
            var data = JsonConvert.DeserializeAnonymousType(jtext, new {IoTHub = new { ConnectionDeviceId = ""}});        
            var blobName = $"{DateTime.UtcNow.ToString("yyyy/MM/dd/hh")}/{data.IoTHub.ConnectionDeviceId}";  
            var blob = blobContainer.GetAppendBlobReference(blobName);
            if(!await blob.ExistsAsync())
            {
                await blob.CreateOrReplaceAsync();
            }
            await blob.AppendTextAsync(jtext + "\r\n");
        }
    return new NoContentResult();
    

    }

    function.json

        {
          "bindings": [
           {
               "authLevel": "function",
               "name": "body",
               "type": "httpTrigger",
               "direction": "in",
               "methods": [
                 "get",
                 "post"
                 ]
          },
          {
              "name": "blobContainer",
              "type": "blob",
              "path": "myContainer",
              "connection": "mySTORAGE",
              "direction": "out"
          },
          {
              "name": "$return",
              "type": "http",
              "direction": "out"
          }
          ]
     }