Search code examples
azureetlazure-data-lakeazure-data-factory

How to automate daily copying files that are being stored in the daily folder using Azure Data Factory


I have a such scenario, in a data lake Gen2 files being generated daily and stored in a folder structure like for example yyyy/mm/dd, my task is picked all all the files from previous day from daily folder merge them and move to another location in a data lake Gen2

What would be the right approach? can I use exclusively only ADF or has to be a combination with some sort of controlling table that would have values from last day process, like for example if there was a problem and files need to be processed from 3 days ago instead of yesterday, how do I know? how to keep track and what would be the appraoch to automate such process, I imagine it is scenario that been developed multiple times, thanks for all your help


Solution

    • You can use dynamic content to get the yesterday's day using which you will be able to read all the files inside the folder structure yyyy/MM/dd.
    @formatDateTime(addDays(utcNow(),-1))
    

    enter image description here


    • You can use the dynamically generated folder structure as wildcard path to read all the files in required folder.
    source/@{formatDateTime(variables('yesterday'),'yyyy')}/@{formatDateTime(variables('yesterday'),'MM')}/@{formatDateTime(variables('yesterday'),'dd')}
    

    enter image description here


    • In the sink, you can select your destination folder, give copy behavior as Merge Files (give required filename in sink dataset else random name will be generated).

    enter image description here


    • To automate this process daily, you can use schedule trigger as shown below. Navigate to Add trigger -> New/Edit -> Choose new trigger. You can select the intervals at which you want to run this pipeline (one day). and create the trigger.

    enter image description here


    • Publish the pipeline and this will trigger the pipeline daily, merging files from yesterday's folder to create a new merged file in destination. The following is pipeline JSON for reference:
    {
        "name": "pipeline1",
        "properties": {
            "activities": [
                {
                    "name": "Copy data1",
                    "type": "Copy",
                    "dependsOn": [
                        {
                            "activity": "yesterdays date",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "DelimitedTextSource",
                            "storeSettings": {
                                "type": "AzureBlobStorageReadSettings",
                                "recursive": true,
                                "wildcardFolderPath": {
                                    "value": "source/@{formatDateTime(variables('yesterday'),'yyyy')}/@{formatDateTime(variables('yesterday'),'MM')}/@{formatDateTime(variables('yesterday'),'dd')}",
                                    "type": "Expression"
                                },
                                "wildcardFileName": "*.csv",
                                "enablePartitionDiscovery": false
                            },
                            "formatSettings": {
                                "type": "DelimitedTextReadSettings"
                            }
                        },
                        "sink": {
                            "type": "DelimitedTextSink",
                            "storeSettings": {
                                "type": "AzureBlobStorageWriteSettings",
                                "copyBehavior": "MergeFiles"
                            },
                            "formatSettings": {
                                "type": "DelimitedTextWriteSettings",
                                "quoteAllText": true,
                                "fileExtension": ".txt"
                            }
                        },
                        "enableStaging": false,
                        "translator": {
                            "type": "TabularTranslator",
                            "typeConversion": true,
                            "typeConversionSettings": {
                                "allowDataTruncation": true,
                                "treatBooleanAsNumber": false
                            }
                        }
                    },
                    "inputs": [
                        {
                            "referenceName": "src",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "dest",
                            "type": "DatasetReference"
                        }
                    ]
                },
                {
                    "name": "yesterdays date",
                    "type": "SetVariable",
                    "dependsOn": [],
                    "userProperties": [],
                    "typeProperties": {
                        "variableName": "yesterday",
                        "value": {
                            "value": "@formatDateTime(addDays(utcNow(),-1))",
                            "type": "Expression"
                        }
                    }
                }
            ],
            "variables": {
                "yesterday": {
                    "type": "String"
                }
            },
            "annotations": []
        }
    }