Search code examples
azureazure-data-factory

ADF how to make a pipeline with a data flow transformation for a csv-files from blob storage to seperate SQL tables dynamically


HI I am a very beginner in ETL ADF.

Situation

  • I populate a blobstorage container with csv files (with different file-structures).
  • The Csv files are daily refreshed (same name with more data) and sometimes a new file is added.
  • All the Csv files have headers with square brackets []|[]|
  • For every CSV there is a metadatfile with _metada.csv (for now out of scope. I can filter them out)

Desired outcome

  1. I want to copy these csv files from blobstorage to Azure SQL with ADF with one Pipeline and dynamically adding new tables as new csv files excist.
  2. During copying replace the [] in the header with nothing ( reason SQL turns it into like [[ ]]] and give errors)
  3. Keep the Csv file-Name as table-name with extra prefix SystemA

Solution With some help

  • I got a Dataflow working with for 1 file for changing the header brackets [] (see code 1)
  • I got a pipeline working for copypast activity (see picture)

picture 1 enter image description here

Now I try

Question
Probally I do something wrong in variable/parameters settings in dataset source dataflow or passing it through
How to setup this up ?

Code Dataflow for one file (working)

{
    "name": "DF_SystemA_CSV",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DS_Blob_Csv_SystemA_land",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "DS_SystemA_Land",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "select1"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          {[Type adres]} as string,",
                "          {[Version]} as string,",
                "          {[Omschrijving]} as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> source1",
                "source1 select(mapColumn(",
                "          each(match(true()),",
                "               replace(replace($$,'[',''),']','') = $$)",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "select1 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     recreate:true,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     errorHandlingOption: 'stopOnFirstError') ~> sink1"
            ]
        }
    }
}

======== Follow up: Edited 11-december 2023 ====== Hi @pratik Lad. Thank you for your respons. I adjusted the pipeline to your example (thanks for the json code) with only the variables and removed the parameters from the dataset. I ran still into an error in the dataflow so I can't see if the solutions works. The purpose for the dataflow i Call in the pipeline is to replace the [ ] signs with nothing for the column-headers (first row from files) to avoid errors in column-names in SQL. But i get the error : "Job failed st sink 'sink' : The identifier ... is too long. Maximum is 128 "

Please forgive my stupidity, but is that becasue I have more then 128 columns in the table or the select function creates a one cell with more then 128 characters ? And how to solve this ?

enter{
"name": "DF_Beaufort_CSV_to_SQL_copy1",
"properties": {
    "type": "MappingDataFlow",
    "typeProperties": {
        "sources": [
            {
                "dataset": {
                    "referenceName": "DS_S_Beaufort_YFDienstverbanden",
                    "type": "DatasetReference"
                },
                "name": "source1"
            }
        ],
        "sinks": [
            {
                "dataset": {
                    "referenceName": "DS_D_Beaufort_SQL",
                    "type": "DatasetReference"
                },
                "name": "sink1"
            }
        ],
        "transformations": [
            {
                "name": "select1"
            }
        ],
        "scriptLines": [
            "source(output(",
            "          {[persoonsnummer ]|[Volgnr dienstverband]|[Version]|[Objectid]|[Pslot_sel]|[Current niveau autorisatie]|[Code regeling ORD]|[Code regeling OVW]|[Code regeling AVBAS]|[Code regeling BAW]|[Code regeling SLD]|[Code regeling VBD]|[Code regeling VSD]|[Code regeling ALG]|[Code regeling OVBAS]|[Soort arbeidsrelatie]|[Opdrachtgever]|[Instelling]|[Registratienummer]|[Datum in dienst]|[Organisatorische eenheid]|[Functie]|[Opzegtermijn in mnd]|[Kode normuren]|[Uren per week]|[Percentage deelbetrekking]|[Werkdagen per week]|[Doelgroep]|[Koppelen]|[Datum uit dienst]|[Reden einde dienstverband]|[Soort loner]|[Code salarisregeling]|[Schaal]|[Ancienniteit/functiejaren]|[Inpassingsnummer]|[Extra treden]|[Nominaal salaris]|[Nominaal uurloon]|[Blokkade aanvulling min.loon]|[Blokkade jeugdaftrek]|[Afw. %jeugdaftrek]|[Garantie schaal]|[Ancienniteit]|[Garantie inpasnummer]|[Nominaal garantiesalaris]|[Code deeldiensttijd]|[% Wachtgeld]|[Soort diensttijd ABP]|[Expiratiedatum ABP]|[Garantiesalaris]|[Garantie uurloon]|[Bruto salaris]|[Bruto uurloon]|[Sprongen periodiek]|[Maand periodiek]|[Blokkade periodiek]|[Sprongen periodiek garantiesalaris]|[Maand periodiek garantiesalaris]|[Laatste periodiek]|[Loonheffingskorting]|[Soort loonheffingstabel]|[Procentuele inschaling]|[Code tariefgroep beschikking]|[Soort werknemersverzekering]|[Afwijkende SV-dagen]|[Soort pensioenfonds]|[Datum intreding pensioenfonds]|[Datum uittreding pensioenfonds]|[Reden uittreding pensioenfonds]|[Code verlofregeling]|[Keuze feestdagenset]|[Datum in dienst CAO]|[Ingangsdatum werkpatroon]|[Datum/tijd laatste herberekening salaris ]|[Basissalaris/uurloon ]|[Jeugdkorting ]|[Deeltijd salaris/uurloon ]|[Basis garantiesalaris/uurloon ]|[Jeugdkorting  garantiesalaris/uurloon]|[Aanvulling garantiesalaris / uurloon ]|[Aanvulling minimumloon/uurloon ]|[Basissalaris/uurloon  eerstvolgende period.]|[Basis garantiesalaris/uurloon eerstvolgende period. ]|[Beroep]|[Blokkade ziekteregistratie]|[Ingangsdatum WAO]|[% WAO]|[% WAO uitkering]|[WAO uitkering]|[Ingangsdatum herintreding WAO]|[Ingangsperiode blokkade ziekte salarisverwerker]|[Afw. wachtdagen ZW uitkering]|[Zorgverzekeraar]|[Inschrijfnr.]|[Ing. datum]|[Einddatum]|[Reden]|[Afw. reden]|[Volgnr IKV]|[Volgnr PGGM]|[Percentage Verhoging]|[Code tellen SV-dagen en uren uurloners]|[Aanmaken variabele LV mutaties]|[Blokkeren verdeling TR]|[Verdeelgroep]|[Instellingsnr. hoofddienstvb.]|[Registratienr. hoofddienstvb.]|[FA pakket]|[Status FA]|[Verlofrecht]|[Gewenst aantal vakantiedagen]|[Verlofrecht verkocht]|[Saldo spaarverlof]|[Spaarverlof mutatie]|[Huidige FA versie]|[Perc. deelbetr. F&O]|[Perc. deelbetr. B&B]|[Perc. deelbetr. verlof]|[Reisafstand woon-werk]|[Aantal reisdagen]|[Aantal niet-reisdagen]|[Vergoeding woon-werk]|[ind.blk.fsc.vrije reisk.]|[Percentage Merit-rating]|[Correctie basissalaris]|[Garantietoelage]|[Levensloopverlofkorting]|[Korting uitkering]|[Perc doorbet. langdurig ziek]|[Vrijw voortzetting PGGM]|[Reden uittrede PGGM]|[Soort inkomstenverhouding]|[Aard arbeidsverhouding]|[Invloed verzekeringsplicht]|[Soort contract]|[Code inkomstenvermindering]|[Arbeidsgehandicaptenkorting]|[Ind. loon incl. WAJONG]|[Ind.  personeelslening]|[Afwijk. risicopremie groep]|[Speciale werknemer]|[Jaarloon  bijz. beloning]|[Perc. bijzonder tarief]|[Code blokk. voordeel]|[Loonheffing Blokkering]|[Code bedrijfsauto]|[Code netto loner]|[52% trf loonheffing]|[Vermindering loonh.]|[WIA IVA Uitkering]|[WIA WGA Uitkering]|[Gekoppeld dienstverband indicatie]|[Ind.ber.verlofr.meeruren]} as string",
            "     ),",
            "     allowSchemaDrift: true,",
            "     validateSchema: false,",
            "     ignoreNoFilesFound: false) ~> source1",
            "source1 select(mapColumn(",
            "          each(match(true()),",
            "               replace(replace($$,'[',''),']','') = $$)",
            "     ),",
            "     skipDuplicateMapInputs: true,",
            "     skipDuplicateMapOutputs: true) ~> select1",
            "select1 sink(allowSchemaDrift: true,",
            "     validateSchema: false,",
            "     deletable:false,",
            "     insertable:true,",
            "     updateable:false,",
            "     upsertable:false,",
            "     recreate:true,",
            "     format: 'table',",
            "     skipDuplicateMapInputs: true,",
            "     skipDuplicateMapOutputs: true,",
            "     errorHandlingOption: 'stopOnFirstError') ~> sink1"
        ]
    }
}

}code here


Solution

  • To use data flow for dynamic transformation you need to add parameterized dataset to data flow.

    • Dataset For input file: enter image description here
    • Dataset for Output SQL table: enter image description here

    Set value for these parameters which you stored in variables: enter image description here

    Mypipeline.json:

    {
        "name": "pipeline1",
        "properties": {
            "activities": [
                {
                    "name": "Get Metadata1",
                    "type": "GetMetadata",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "dataset": {
                            "referenceName": "DelimitedText1",
                            "type": "DatasetReference"
                        },
                        "fieldList": [
                            "childItems"
                        ],
                        "storeSettings": {
                            "type": "AzureBlobStorageReadSettings",
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "DelimitedTextReadSettings"
                        }
                    }
                },
                {
                    "name": "ForEach1",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Get Metadata1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('Get Metadata1').output.childItems",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "filename",
                                "type": "SetVariable",
                                "dependsOn": [],
                                "policy": {
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "filename",
                                    "value": {
                                        "value": "@item().name",
                                        "type": "Expression"
                                    }
                                }
                            },
                            {
                                "name": "tablename",
                                "type": "SetVariable",
                                "dependsOn": [
                                    {
                                        "activity": "filename",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "policy": {
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "tablename",
                                    "value": {
                                        "value": "@concat(split(item().name,'.')[0],'SystemA')",
                                        "type": "Expression"
                                    }
                                }
                            },
                            {
                                "name": "Data flow1",
                                "type": "ExecuteDataFlow",
                                "dependsOn": [
                                    {
                                        "activity": "tablename",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "policy": {
                                    "timeout": "0.12:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "dataflow": {
                                        "referenceName": "dataflow1",
                                        "type": "DataFlowReference",
                                        "datasetParameters": {
                                            "source1": {
                                                "filename": {
                                                    "value": "@variables('filename')",
                                                    "type": "Expression"
                                                }
                                            },
                                            "sink1": {
                                                "tablename": {
                                                    "value": "@variables('tablename')",
                                                    "type": "Expression"
                                                }
                                            }
                                        }
                                    },
                                    "compute": {
                                        "coreCount": 8,
                                        "computeType": "General"
                                    },
                                    "traceLevel": "Fine"
                                }
                            }
                        ]
                    }
                }
            ],
            "variables": {
                "filename": {
                    "type": "String"
                },
                "tablename": {
                    "type": "String"
                }
            },
            "annotations": []
        }
    }
    

    Output

    enter image description here