I am bit new using Data Flow in ADF so thats why im asking u for help. here's the situation:
I would like to transform a Json file from REST API in Data Flow for creating a tabular/ logical table structure. After transformation i would like to send this data + structure to a Azure SQL Database.
Here is a overview what i've done:
Created a Pipeline to copy data from a website to my Data Lake.
This is the input source:
{
"id":[
{
"value":40051
}
],
"uuid":[
{
"value":"0ca12ac9-d94b-44cf-a35b-8b7256006cf8"
}
],
"revision_id":[
{
"value":1452381
}
],
"langcode":[
{
"value":"nl"
}
],
"type":[
{
"target_id":"par_chart",
"target_type":"paragraphs_type",
"target_uuid":"2c3143a2-bd78-4b4d-afb6-19160de928f2"
}
],
"status":[
{
"value":true
}
],
"created":[
{
"value":"2019-10-17T12:08:05+00:00",
"format":"Y-m-d\\TH:i:sP"
}
],
"parent_id":[
{
"value":"2561"
}
],
"parent_type":[
{
"value":"node"
}
],
"parent_field_name":[
{
"value":"field_paragraphs"
}
],
"behavior_settings":[
{
"value":[
]
}
],
"default_langcode":[
{
"value":true
}
],
"revision_translation_affected":[
{
"value":true
}
],
"content_translation_source":[
{
"value":"und"
}
],
"content_translation_outdated":[
{
"value":false
}
],
"content_translation_changed":[
{
"value":"2023-05-09T09:48:38+00:00",
"format":"Y-m-d\\TH:i:sP"
}
],
"field_par_chart":[
{
"csv":"[[\"\",\"Percentage behandeling\",\"Percentage behandeling\",\"Percentage behandeling\"],[\"2010\",\"19.4\",null,\"\"],[\"2011\",\"16.6\",null,\"\"],[\"2012\",\"15.4\",null,\"\"],[\"2013\",\"13.5\",null,\"\"],[\"2014\",\"13\",null,\"\"],[\"2015\",\"13\",null,\"\"],[\"2016\",\"14.1\",null,\"\"],[\"2017\",\"17.7\",null,\"\"],[\"2018\",\"24\",null,\"\"],[\"2019\",\"\",\"27.7\",\"\"],[\"2020\",\"\",\"31.9\",\"\"],[\"2021*\",\"\",\"\",\"42.9\"],[\"2022*\",null,\"\",\"46.2\"]]",
"csv_url":"",
"config":"{\"chart\":{\"type\":\"line\",\"renderTo\":{\"hcEvents\":{\"mousedown\":[{\"order\":null}],\"touchstart\":[{\"order\":null}],\"mouseover\":[{\"order\":null}],\"mouseout\":[{\"order\":null}]},\"__EV_STORE_KEY@7\":{}}},\"xAxis\":[{\"type\":\"category\",\"index\":0,\"isX\":true}],\"yAxis\":[{\"title\":{\"text\":\"Percentage\",\"offset\":-81.859375},\"min\":0,\"tickInterval\":5,\"index\":0,\"events\":{}}],\"templateName\":\"lineBasic\",\"series\":[{\"type\":null,\"animation\":false,\"data\":[{\"name\":\"2010\",\"y\":19.4},{\"name\":\"2011\",\"y\":16.6},{\"name\":\"2012\",\"y\":15.4},{\"name\":\"2013\",\"y\":13.5},{\"name\":\"2014\",\"y\":13},{\"name\":\"2015\",\"y\":13},{\"name\":\"2016\",\"y\":14.1},{\"name\":\"2017\",\"y\":17.7},{\"name\":\"2018\",\"y\":24},{\"name\":\"2019\",\"y\":null},{\"name\":\"2020\",\"y\":null},{\"name\":\"2021*\",\"y\":null},{\"name\":\"2022*\",\"y\":null}],\"name\":\"Percentage behandeling\",\"_colorIndex\":0,\"_symbolIndex\":0},{\"type\":\"line\",\"animation\":false,\"data\":[{\"name\":\"2010\",\"y\":null},{\"name\":\"2011\",\"y\":null},{\"name\":\"2012\",\"y\":null},{\"name\":\"2013\",\"y\":null},{\"name\":\"2014\",\"y\":null},{\"name\":\"2015\",\"y\":null},{\"name\":\"2016\",\"y\":null},{\"name\":\"2017\",\"y\":null},{\"name\":\"2018\",\"y\":null},{\"name\":\"2019\",\"y\":27.7},{\"name\":\"2020\",\"y\":31.9},{\"name\":\"2021*\",\"y\":null},{\"name\":\"2022*\",\"y\":null}],\"name\":\"Percentage behandeling\"},{\"type\":\"line\",\"animation\":false,\"data\":[{\"y\":null,\"name\":\"2010\"},{\"y\":null,\"name\":\"2011\"},{\"y\":null,\"name\":\"2012\"},{\"y\":null,\"name\":\"2013\"},{\"y\":null,\"name\":\"2014\"},{\"y\":null,\"name\":\"2015\"},{\"y\":null,\"name\":\"2016\"},{\"y\":null,\"name\":\"2017\"},{\"y\":null,\"name\":\"2018\"},{\"y\":null,\"name\":\"2019\"},{\"y\":null,\"name\":\"2020\"},{\"y\":42.9,\"name\":\"2021*\"},{\"y\":46.2,\"name\":\"2022*\"}],\"name\":\"Percentage behandeling\"}],\"title\":{\"text\":\"Trend in wachttijden voor behandeling in ziekenhuis langer dan de Treeknorm\"},\"legend\":{\"enabled\":false}}"
}
],
"field_par_extra_info":[
{
"value":"singlecard"
}
],
"field_par_hidden":[
{
"value":false
}
],
"field_par_text":[
{
"value":"<ul>\r\n\t<li><em>Door meerdere veranderingen van de bron en methode zijn de percentages van 2010-2018, 2019-2020 en 2021-2022 niet goed met elkaar te vergelijken.</em></li>\r\n\t<li><em><span><span><span>De wachttijden van 2021 zijn enkel gebaseerd op wachttijden gemeten van augustus 2021 t/m december 2021 vanwege een verandering in de methode. In de eerste helft van 2021 was 28,7% van de wachttijden voor behandeling langer dan de treeknorm.</span></span></span></em></li>\r\n\t<li><em>In 2020 brak de coronapandemie uit. Hiermee moet rekening worden gehouden bij het interpreteren van de trends.</em></li>\r\n</ul>\r\n\r\n<p><strong>Bron </strong> <br />\r\nWachttijdenonderzoek, Mediquest<br />\r\nWachttijdenregistratie NZa<br />\r\n<strong>Verslagjaar t/m</strong><br />\r\n2022<br />\r\n<strong>Laatste update gegevens </strong><br />\r\n24 mei 2023<br />\r\n<strong>Updatefrequentie </strong><br />\r\nJaarlijks<br />\r\n<strong>Meer info</strong><br />\r\n<a href=\"https://www.vzinfo.nl/prestatie-indicatoren/wachttijd-langer-dan-treeknorm-behandeling\" target=\"_blank\">Prestatie-indicatoren gezondheidszorg op VZinfo.nl</a></p>\r\n",
"format":"volledige_html",
"processed":"<ul><li><em>Door meerdere veranderingen van de bron en methode zijn de percentages van 2010-2018, 2019-2020 en 2021-2022 niet goed met elkaar te vergelijken.</em></li>\n\t<li><em><span><span><span>De wachttijden van 2021 zijn enkel gebaseerd op wachttijden gemeten van augustus 2021 t/m december 2021 vanwege een verandering in de methode. In de eerste helft van 2021 was 28,7% van de wachttijden voor behandeling langer dan de treeknorm.</span></span></span></em></li>\n\t<li><em>In 2020 brak de coronapandemie uit. Hiermee moet rekening worden gehouden bij het interpreteren van de trends.</em></li>\n</ul><p><strong>Bron </strong> <br />\nWachttijdenonderzoek, Mediquest<br />\nWachttijdenregistratie NZa<br /><strong>Verslagjaar t/m</strong><br />\n2022<br /><strong>Laatste update gegevens </strong><br />\n24 mei 2023<br /><strong>Updatefrequentie </strong><br />\nJaarlijks<br /><strong>Meer info</strong><br /><a href=\"https://www.vzinfo.nl/prestatie-indicatoren/wachttijd-langer-dan-treeknorm-behandeling\" target=\"_blank\">Prestatie-indicatoren gezondheidszorg op VZinfo.nl</a></p>\n\n"
}
],
"field_par_text_bgcolor":[
{
"value":"bg-gray-lightest"
}
],
"field_par_text_position":[
{
"value":"below"
}
],
"field_par_title":[
{
"value":"Trend "
}
],
"field_par_title_class":[
],
"field_par_title_enable":[
{
"value":false
}
],
"field_par_title_tag":[
{
"value":"h4"
}
]
}
This is the sink part of ADF Pipeline:
Its saved in the data lake as a .json file.
Important: i only need to transform this part of the Json element: field_par_chart with "csv" element
The expected data structure must be like this (see image below) and saved as a text file.
Any suggestions how to do this with a ADF Pipeline and/or ADF Data Flow?
Many thanks for your time and effort!
If your target columns are limited and column names are known, then you can try the below approach.
In your expected output, 3 columns have same name, ADF or SQL won't support these. So, I have ignored the first array(column names array) from the field_par_chart
field.
As you want the data only from field_par_chart
field, remove the remaining fields from the source using select transformation.
Then I took 3 dervied column transformations.
derived column1:
It splits the string and generates array of arrays with below dynamic expression.
map(split(replace(field_par_chart_csv,'[',''),'],'),split(replace(replace(#item,'"',''),']',''),','))
derived column2:
It skips the first sub array(array of column names) due to the same column names and unfolds all the sub arrays and converts them into rows like below using this dynamic expression.
unfold(slice(arr,2))
derived column3:
It generates the required columns from the row arrays. Here, I am giving the column names manually and converting the values from string to double. you can give whatever column names you want manually.
In the sink, give your target SQL table and give the mapping for only those 4 columns and remove the extra columns that we got from previous transformations.
Result:
Execute this dataflow by pipeline and you can load this data to your target SQL table.
My Dataflow JSON for your reference:
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "Json1",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "derivedColumn1"
},
{
"name": "select1"
},
{
"name": "derivedColumn2"
},
{
"name": "derivedColumn3"
}
],
"scriptLines": [
"source(output(",
" id as (value as integer)[],",
" uuid as (value as string)[],",
" revision_id as (value as integer)[],",
" langcode as (value as string)[],",
" type as (target_id as string, target_type as string, target_uuid as string)[],",
" status as (value as boolean)[],",
" created as (value as string, format as string)[],",
" parent_id as (value as string)[],",
" parent_type as (value as string)[],",
" parent_field_name as (value as string)[],",
" behavior_settings as (value as string[])[],",
" default_langcode as (value as boolean)[],",
" revision_translation_affected as (value as boolean)[],",
" content_translation_source as (value as string)[],",
" content_translation_outdated as (value as boolean)[],",
" content_translation_changed as (value as string, format as string)[],",
" field_par_chart as (csv as string, csv_url as string, config as string)[],",
" field_par_extra_info as (value as string)[],",
" field_par_hidden as (value as boolean)[],",
" field_par_text as (value as string, format as string, processed as string)[],",
" field_par_text_bgcolor as (value as string)[],",
" field_par_text_position as (value as string)[],",
" field_par_title as (value as string)[],",
" field_par_title_class as string[],",
" field_par_title_enable as (value as boolean)[],",
" field_par_title_tag as (value as string)[]",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false,",
" documentForm: 'singleDocument') ~> source1",
"select1 derive(arr = map(split(replace(field_par_chart_csv,'[',''),'],'),split(replace(replace(#item,'\"',''),']',''),','))) ~> derivedColumn1",
"source1 select(mapColumn(",
" field_par_chart_csv = field_par_chart[1].csv",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"derivedColumn1 derive(new_arr = unfold(slice(arr,2))) ~> derivedColumn2",
"derivedColumn2 derive(Category = new_arr[1],",
" {Percentage behandeling1} = toDouble(new_arr[2]),",
" {Percentage behandeling2} = toDouble(new_arr[3]),",
" {Percentage behandeling3} = toDouble(new_arr[4])) ~> derivedColumn3",
"derivedColumn3 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" recreate:true,",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError',",
" mapColumn(",
" Category,",
" {Percentage behandeling1},",
" {Percentage behandeling2},",
" {Percentage behandeling3}",
" )) ~> sink1"
]
}
}
}