Search code examples
azureazure-sql-databasecqlazure-iot-hubazure-stream-analytics

insert/update referenced data from iothub in sql table by conditions with stream analytics


I am workig on azure, there is a sql transact database with two tables and an iotHub which receives data from several devices. I have to check if the data are currently existing in the database and store them if not otherwise update the old data in the table.

tableOne like:

id   |   key1
-------------
1    |   abc
2    |   def
3    |   ghi

tableTwo like:

id    |   id_tableOne  |   key2         |   something
-------------------------------------------------------
77    |   2            |   Emil         |   welcome
78    |   1            |   Emil         |   here I am
79    |   1            |   Hans         |   hello world

stream as json msg coming from iotHub like this:

{
    "topic": "test",
    "key1": "ghi",
    "data": [{
        "key2": "Emil",
        "something": "lmn"
    },
    {
        "key2": "Hans",
        "something": "hij"
    },
    {
        "key2": "Gerda",
        "something": "xyz"
    }]
}
  1. I want to get the id from tableOne by "key1" in the json stream
  2. I want to check if the combination of id_tableOne (result from 1.) and key2 exists in tableTwo
  3. if it does exist: update the row in tableTwo - else: insert new row in tableTwo

Solution

  • Based on the Stream data as input into Stream Analytics, only eventhubs,iothubs and blob storage are suppported in azure stream anslytics inputs.So, you can't filter the conditions with output sql database. As following sql is forbidden.

    SELECT 
    jaysqlserver2.id as id,
    jaysqlserver2.id_tableOne as idTableOne,
    jaysqlserver2.key2 as key2,
    jaysqlserver2.somthing as something
    from jsoninput
    where jaysqlserver2.id_tableOne = jsoninput.key1 
    

    However,I provide you with an workaround.

    Firstly, you could flat the jsoninput and save them into a temporary table in your target sql database.

    SELECT 
        jsoninput.key1, 
        arrayElement.ArrayValue.key2,
        arrayElement.ArrayValue.something
    INTO 
        output
    FROM jsoninput
    CROSS APPLY GetArrayElements(jsoninput.data) AS arrayElement
    

    enter image description here

    Then, you could follow this doc to use Azure Function Time Trigger to operate sql database automaticlly. Please refer to pseudo code as below:

    #r "System.Configuration"
    #r "System.Data"
    using System.Net;
    using System.Configuration;
    using System.Data.SqlClient;
    using System.Threading.Tasks;
    
    public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
    {
        var str = ConfigurationManager.ConnectionStrings["sqldb_connection"].ConnectionString;
        using (SqlConnection conn = new SqlConnection(str))
        {
            conn.Open();
            var text = "select temp.key1 as key1temp,temp.key2 as key2temp,
    t1.id as id1,t1.key1 as key1,
    t2.id as id2,t2.id_tableOne as tableOne,t2.key2 as key2,t2.something as something
    from dbo.tabletemp as temp
    left join dbo.table1 as t1 on temp.key1 = t1.key1
    left join dbo.table2 as t2 on t1.id = t2.id_tableOne and temp.key2 = t2.key2
    where t1.id is not null";
    
            SqlCommand sqlComm = new SqlCommand(text, conn);
    
            SqlDataReader reader = sqlComm.ExecuteReader();
    
            while(reader.Read())
            {
                string id2 = reader["id2"].ToString();
                if(id2 == null)
                {
                    //execute insert sql
                }
                else
                {
                    //execute update sql
                }
            }
    
            var delSql = "delete from dbo.temp";
            //execute delete data in temp table
            ...
        }
    }
    

    Hope it helps you.Any concern, please let me know.