I am trying to load and transform data from an API to Azure SQL using ADF copy activity. The data from json API(source) is in below format.
{
"fdgdhgfh": {
"so2_production": 7hjhgj953,
"battery_charge": jkjlkj,
"battery_discharge": kjlklj,
"critical_load_energy": 4ljljh4
},
"9fsdsfb": {
"so2_production": asdasd,
"battery_charge": sdaasf,
"battery_discharge": ewewrwer,
"critical_load_energy": bmvkbjk
}
}
I want to map "fdgdhgfh" or "9fsdsfb" to a column in Azure SQL using copy activity. By default when I import schemas I am getting object values "so2_production","battery_charge","battery_discharge", "critical_load_energy" to map in SQL but I want to map object not the object value.
AFAIK, Using copy activity, it might not be possible to achieve your desired result.
I could able to get it done using combination of variables and script activity.
NOTE: This approach will only work when there is no pagination as it involves usage of web activity. If there is a pagination, you need to do use web activity in every iteration. If you can use dataflow, then it be the better option with combination of derived columns and flatten transformations beacuse dataflow supports REST API and pagination.
These are my variables and flow of pipeline.
Here I have used lookup activity from blob to get your JSON. In your case use web activity here.
Then I have converted the JSON to string and stored in a variable jsonstring
.
@string(activity('Lookup1').output.value[0])
. In your case it will be @string(activity('Web1').output)
.
After that, I have used split on that variable with },
and stored the result in split1
variable.
@split(substring(variables('jsonstring'),1,add(length(variables('jsonstring')),-2)), '},')
which will give result as below.
I have used a ForEach here, and given the above array @variables('split1')
to it.
Inside ForEach, to store the keys of JSON into cols
variable I have used the below expression in each iteration.
@substring(split(item(),':{')[0],1,add(length(split(item(),':{')[0]),-2))
After that, I have used the below expression to store the so2_production
values into values
expression.@activity('Lookup1').output.value[0][variables('cols')]['so2_production']
.
In your case it will be activity('Web1').output[variables('cols')] ['so2_production']
Inside same ForEach after getting the cols and object values, I have used a script activity to insert the data from variables into target table.
insert into sample1(uid,usource) values('@{variables('cols')}','@{variables('values')}')
It will insert each key and object value for every iteration.
My Pipeline JSON:
{
"name": "pipeline3",
"properties": {
"activities": [
{
"name": "Lookup1",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "JsonSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "JsonReadSettings"
}
},
"dataset": {
"referenceName": "sourcejson",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "Lookup output to string",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "jsonstring",
"value": {
"value": "@string(activity('Lookup1').output.value[0])",
"type": "Expression"
}
}
},
{
"name": "split on string",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Lookup output to string",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "split1",
"value": {
"value": "@split(substring(variables('jsonstring'),1,add(length(variables('jsonstring')),-2)), '},')",
"type": "Expression"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "split on string",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@variables('split1')",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Cols",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "cols",
"value": {
"value": "@substring(split(item(),':{')[0],1,add(length(split(item(),':{')[0]),-2))",
"type": "Expression"
}
}
},
{
"name": "values",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Cols",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "values",
"value": {
"value": "@activity('Lookup1').output.value[0][variables('cols')]['so2_production']",
"type": "Expression"
}
}
},
{
"name": "Script1",
"type": "Script",
"dependsOn": [
{
"activity": "values",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"linkedServiceName": {
"referenceName": "AzureSqlDatabase1",
"type": "LinkedServiceReference"
},
"typeProperties": {
"scripts": [
{
"type": "Query",
"text": {
"value": "insert into sample1(uid,usource) values('@{variables('cols')}','@{variables('values')}')",
"type": "Expression"
}
}
],
"scriptBlockExecutionTimeout": "02:00:00"
}
}
]
}
}
],
"variables": {
"jsonstring": {
"type": "String"
},
"split1": {
"type": "Array"
},
"cols": {
"type": "String"
},
"values": {
"type": "String"
}
},
"annotations": []
}
}
Result: