I am workig on azure, there is a sql transact database with two tables and an iotHub which receives data from several devices. I have to check if the data are currently existing in the database and store them if not otherwise update the old data in the table.
tableOne like:
id | key1
-------------
1 | abc
2 | def
3 | ghi
tableTwo like:
id | id_tableOne | key2 | something
-------------------------------------------------------
77 | 2 | Emil | welcome
78 | 1 | Emil | here I am
79 | 1 | Hans | hello world
stream as json msg coming from iotHub like this:
{
"topic": "test",
"key1": "ghi",
"data": [{
"key2": "Emil",
"something": "lmn"
},
{
"key2": "Hans",
"something": "hij"
},
{
"key2": "Gerda",
"something": "xyz"
}]
}
Based on the Stream data as input into Stream Analytics, only eventhubs,iothubs and blob storage
are suppported in azure stream anslytics inputs.So, you can't filter the conditions with output sql database. As following sql is forbidden.
SELECT
jaysqlserver2.id as id,
jaysqlserver2.id_tableOne as idTableOne,
jaysqlserver2.key2 as key2,
jaysqlserver2.somthing as something
from jsoninput
where jaysqlserver2.id_tableOne = jsoninput.key1
However,I provide you with an workaround.
Firstly, you could flat the jsoninput
and save them into a temporary table in your target sql database.
SELECT
jsoninput.key1,
arrayElement.ArrayValue.key2,
arrayElement.ArrayValue.something
INTO
output
FROM jsoninput
CROSS APPLY GetArrayElements(jsoninput.data) AS arrayElement
Then, you could follow this doc to use Azure Function Time Trigger to operate sql database automaticlly. Please refer to pseudo code as below:
#r "System.Configuration"
#r "System.Data"
using System.Net;
using System.Configuration;
using System.Data.SqlClient;
using System.Threading.Tasks;
public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
{
var str = ConfigurationManager.ConnectionStrings["sqldb_connection"].ConnectionString;
using (SqlConnection conn = new SqlConnection(str))
{
conn.Open();
var text = "select temp.key1 as key1temp,temp.key2 as key2temp,
t1.id as id1,t1.key1 as key1,
t2.id as id2,t2.id_tableOne as tableOne,t2.key2 as key2,t2.something as something
from dbo.tabletemp as temp
left join dbo.table1 as t1 on temp.key1 = t1.key1
left join dbo.table2 as t2 on t1.id = t2.id_tableOne and temp.key2 = t2.key2
where t1.id is not null";
SqlCommand sqlComm = new SqlCommand(text, conn);
SqlDataReader reader = sqlComm.ExecuteReader();
while(reader.Read())
{
string id2 = reader["id2"].ToString();
if(id2 == null)
{
//execute insert sql
}
else
{
//execute update sql
}
}
var delSql = "delete from dbo.temp";
//execute delete data in temp table
...
}
}
Hope it helps you.Any concern, please let me know.