HI I am a very beginner in ETL ADF.
Situation
Desired outcome
Solution With some help
picture 1 enter image description here
Now I try
to insert the dataflow in the 'For each' activity instead of the copypast activity.
Make the source dataset for the Dataflow ready to receive the filename by parameter from the pipeline for each iteration
Dataset input Dataflow settings
enter image description here
enter image description here
Dataflow settings in pipeline
enter image description here
enter image description here
Question
Probally I do something wrong in variable/parameters settings in dataset source dataflow or passing it through
How to setup this up ?
Code Dataflow for one file (working)
{
"name": "DF_SystemA_CSV",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DS_Blob_Csv_SystemA_land",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "DS_SystemA_Land",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "select1"
}
],
"scriptLines": [
"source(output(",
" {[Type adres]} as string,",
" {[Version]} as string,",
" {[Omschrijving]} as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"source1 select(mapColumn(",
" each(match(true()),",
" replace(replace($$,'[',''),']','') = $$)",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"select1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" recreate:true,",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> sink1"
]
}
}
}
======== Follow up: Edited 11-december 2023 ====== Hi @pratik Lad. Thank you for your respons. I adjusted the pipeline to your example (thanks for the json code) with only the variables and removed the parameters from the dataset. I ran still into an error in the dataflow so I can't see if the solutions works. The purpose for the dataflow i Call in the pipeline is to replace the [ ] signs with nothing for the column-headers (first row from files) to avoid errors in column-names in SQL. But i get the error : "Job failed st sink 'sink' : The identifier ... is too long. Maximum is 128 "
Please forgive my stupidity, but is that becasue I have more then 128 columns in the table or the select function creates a one cell with more then 128 characters ? And how to solve this ?
enter{
"name": "DF_Beaufort_CSV_to_SQL_copy1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DS_S_Beaufort_YFDienstverbanden",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "DS_D_Beaufort_SQL",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "select1"
}
],
"scriptLines": [
"source(output(",
" {[persoonsnummer ]|[Volgnr dienstverband]|[Version]|[Objectid]|[Pslot_sel]|[Current niveau autorisatie]|[Code regeling ORD]|[Code regeling OVW]|[Code regeling AVBAS]|[Code regeling BAW]|[Code regeling SLD]|[Code regeling VBD]|[Code regeling VSD]|[Code regeling ALG]|[Code regeling OVBAS]|[Soort arbeidsrelatie]|[Opdrachtgever]|[Instelling]|[Registratienummer]|[Datum in dienst]|[Organisatorische eenheid]|[Functie]|[Opzegtermijn in mnd]|[Kode normuren]|[Uren per week]|[Percentage deelbetrekking]|[Werkdagen per week]|[Doelgroep]|[Koppelen]|[Datum uit dienst]|[Reden einde dienstverband]|[Soort loner]|[Code salarisregeling]|[Schaal]|[Ancienniteit/functiejaren]|[Inpassingsnummer]|[Extra treden]|[Nominaal salaris]|[Nominaal uurloon]|[Blokkade aanvulling min.loon]|[Blokkade jeugdaftrek]|[Afw. %jeugdaftrek]|[Garantie schaal]|[Ancienniteit]|[Garantie inpasnummer]|[Nominaal garantiesalaris]|[Code deeldiensttijd]|[% Wachtgeld]|[Soort diensttijd ABP]|[Expiratiedatum ABP]|[Garantiesalaris]|[Garantie uurloon]|[Bruto salaris]|[Bruto uurloon]|[Sprongen periodiek]|[Maand periodiek]|[Blokkade periodiek]|[Sprongen periodiek garantiesalaris]|[Maand periodiek garantiesalaris]|[Laatste periodiek]|[Loonheffingskorting]|[Soort loonheffingstabel]|[Procentuele inschaling]|[Code tariefgroep beschikking]|[Soort werknemersverzekering]|[Afwijkende SV-dagen]|[Soort pensioenfonds]|[Datum intreding pensioenfonds]|[Datum uittreding pensioenfonds]|[Reden uittreding pensioenfonds]|[Code verlofregeling]|[Keuze feestdagenset]|[Datum in dienst CAO]|[Ingangsdatum werkpatroon]|[Datum/tijd laatste herberekening salaris ]|[Basissalaris/uurloon ]|[Jeugdkorting ]|[Deeltijd salaris/uurloon ]|[Basis garantiesalaris/uurloon ]|[Jeugdkorting garantiesalaris/uurloon]|[Aanvulling garantiesalaris / uurloon ]|[Aanvulling minimumloon/uurloon ]|[Basissalaris/uurloon eerstvolgende period.]|[Basis garantiesalaris/uurloon eerstvolgende period. ]|[Beroep]|[Blokkade ziekteregistratie]|[Ingangsdatum WAO]|[% WAO]|[% WAO uitkering]|[WAO uitkering]|[Ingangsdatum herintreding WAO]|[Ingangsperiode blokkade ziekte salarisverwerker]|[Afw. wachtdagen ZW uitkering]|[Zorgverzekeraar]|[Inschrijfnr.]|[Ing. datum]|[Einddatum]|[Reden]|[Afw. reden]|[Volgnr IKV]|[Volgnr PGGM]|[Percentage Verhoging]|[Code tellen SV-dagen en uren uurloners]|[Aanmaken variabele LV mutaties]|[Blokkeren verdeling TR]|[Verdeelgroep]|[Instellingsnr. hoofddienstvb.]|[Registratienr. hoofddienstvb.]|[FA pakket]|[Status FA]|[Verlofrecht]|[Gewenst aantal vakantiedagen]|[Verlofrecht verkocht]|[Saldo spaarverlof]|[Spaarverlof mutatie]|[Huidige FA versie]|[Perc. deelbetr. F&O]|[Perc. deelbetr. B&B]|[Perc. deelbetr. verlof]|[Reisafstand woon-werk]|[Aantal reisdagen]|[Aantal niet-reisdagen]|[Vergoeding woon-werk]|[ind.blk.fsc.vrije reisk.]|[Percentage Merit-rating]|[Correctie basissalaris]|[Garantietoelage]|[Levensloopverlofkorting]|[Korting uitkering]|[Perc doorbet. langdurig ziek]|[Vrijw voortzetting PGGM]|[Reden uittrede PGGM]|[Soort inkomstenverhouding]|[Aard arbeidsverhouding]|[Invloed verzekeringsplicht]|[Soort contract]|[Code inkomstenvermindering]|[Arbeidsgehandicaptenkorting]|[Ind. loon incl. WAJONG]|[Ind. personeelslening]|[Afwijk. risicopremie groep]|[Speciale werknemer]|[Jaarloon bijz. beloning]|[Perc. bijzonder tarief]|[Code blokk. voordeel]|[Loonheffing Blokkering]|[Code bedrijfsauto]|[Code netto loner]|[52% trf loonheffing]|[Vermindering loonh.]|[WIA IVA Uitkering]|[WIA WGA Uitkering]|[Gekoppeld dienstverband indicatie]|[Ind.ber.verlofr.meeruren]} as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"source1 select(mapColumn(",
" each(match(true()),",
" replace(replace($$,'[',''),']','') = $$)",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"select1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" recreate:true,",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> sink1"
]
}
}
}code here
To use data flow for dynamic transformation you need to add parameterized dataset to data flow.
Set value for these parameters which you stored in variables:
Mypipeline.json:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Get Metadata1",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "DelimitedText1",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Get Metadata1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Get Metadata1').output.childItems",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "filename",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "filename",
"value": {
"value": "@item().name",
"type": "Expression"
}
}
},
{
"name": "tablename",
"type": "SetVariable",
"dependsOn": [
{
"activity": "filename",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "tablename",
"value": {
"value": "@concat(split(item().name,'.')[0],'SystemA')",
"type": "Expression"
}
}
},
{
"name": "Data flow1",
"type": "ExecuteDataFlow",
"dependsOn": [
{
"activity": "tablename",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataflow": {
"referenceName": "dataflow1",
"type": "DataFlowReference",
"datasetParameters": {
"source1": {
"filename": {
"value": "@variables('filename')",
"type": "Expression"
}
},
"sink1": {
"tablename": {
"value": "@variables('tablename')",
"type": "Expression"
}
}
}
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine"
}
}
]
}
}
],
"variables": {
"filename": {
"type": "String"
},
"tablename": {
"type": "String"
}
},
"annotations": []
}
}
Output