Search code examples
azurerecursionforeachazure-data-factory

Azure Data Factory - Get Metadata (Child Items) For Each Sub-Directory


Objective: For each sub-directory in a root directory (DIRECTORY-B), get all the child items iteratively. For this, I've a SFTP folder with the below hierarchy:

SFTP_FOLDER
  - DIRECTORY-A
  - DIRECTORY-B
    - Sub-Directory I
      - a.csv
      - b.csv
    - Sub-Directory II
      - c.csv
    - Sub-Directory III
      - d.csv
      - e.csv
  - DIRECTORY-C

For this, I've setup a two-step approach: GetMetadata > ForEach(GetMetadata) to get all the files in sub-directory.

ADF Pipeline Description

I'm able to get all the directories from GET Directory Structure, however I am not able to parametrize the directory name in GET File List, which is configured like:

GET File List - Settings

Dataset Description:

Dynamic Directory Settings

Pipeline Expression Builder

Though all the values are passed properly, the child items are not retrieved from the directory. My question: How to make the folder name dynamic keeping SFTP_FOLDER/DIRECTORY-B constant?

Alternatively, is there a simple way to recursively find and copy the latest modified files in a directory with subdirectories from source to sink?

NOTE: I've to go from folder to folder, and wildcard like SFTP_FOLDER/DIRECTORY-B/*/*.csv is not applicable as I want to retrieve only the latest file from each directory.

This question is similar to Q63566122, however I have already implemented the mentioned process but not able to achieve the result.


Solution

  • It is better to use data flow instead of get metadata to use wild card path for file path as follows:

    Create a dataset with an SFTP folder and select it as the data flow source dataset. Go to source settings and use DIRECTORY-B/*/*.csv. Add the column, and if you want, you can also add the Start time (subDays(currentUTC(),1) and End time (currentUTC()) filters for the files, but make sure to change the expression as per your time as shown below:

    enter image description here

    Here, regardless of the file structure, all rows will be merged, and a column will be added containing the file path for each specific row, as shown below:

    enter image description here

    Use aggregation transformation, group it by filepath column, and create a sample column for aggregation. Select the filepath column using select transformation, add sink transformation, and select cache as the sink type. Check Write to activity output as shown below:

    enter image description here

    If you execute the data flow, you will get the output as shown below:

    enter image description here

    To get the latest file from DIRECTORY-B, add a for each activity on the success of the data flow and give the below expression:

    @activity('Data flow1').output.runStatus.output.sink1.value
    

    Add a get metadata activity inside the foreach. Select a dataset with a parameter for the file name and add @item().filepath expression for the parameter. Create Last modified and Item name under the field list as shown below:

    enter image description here

    Create two variables named refd with 1900-01-01 00:00:00 value to get the latest modified date of the file and ltf to store the latest modified file name. Add an if condition and use the below expression to compare file modified dates with the refd variable value:

    @greater(activity('Get Metadata1').output.lastModified,variables('refd'))
    

    If True, add a set variable activity inside True. Call the refid variable and add the below expression as the value. Update the variable with the last modified date:

    @activity('Get Metadata1').output.lastModified
    

    On the success of the set variable, add another set variable and call ltf. Add the below expression to store the latest modified file name:

    @activity('Get Metadata1').output.itemName
    

    On the success of the foreach, add a set variable, and add @variables('ltf') expression as the value for the variable to store the latest modified file. If you debug the pipeline, you will get the latest modified file of DIRECTORY-B as shown below:

    enter image description here

    For your reference here is Dataflow Json:

    {
        "name": "dataflow1",
        "properties": {
            "type": "MappingDataFlow",
            "typeProperties": {
                "sources": [
                    {
                        "dataset": {
                            "referenceName": "DelimitedText5",
                            "type": "DatasetReference"
                        },
                        "name": "source1"
                    }
                ],
                "sinks": [
                    {
                        "name": "sink1"
                    }
                ],
                "transformations": [
                    {
                        "name": "aggregate1"
                    },
                    {
                        "name": "select1",
                        "description": "Renaming aggregate1 to select1 with columns 'filepath'"
                    }
                ],
                "scriptLines": [
                    "source(output(",
                    "          Id as string,",
                    "          Name as string",
                    "     ),",
                    "     allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     ignoreNoFilesFound: false,",
                    "     rowUrlColumn: 'filepath',",
                    "     modifiedAfter: (subDays(currentUTC(),1) ),",
                    "     modifiedBefore: (currentUTC()),",
                    "     wildcardPaths:['DIRECTORY-B/*/*.csv']) ~> source1",
                    "source1 aggregate(groupBy(filepath),",
                    "     cnt = count(filepath)) ~> aggregate1",
                    "aggregate1 select(mapColumn(",
                    "          filepath",
                    "     ),",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true) ~> select1",
                    "select1 sink(validateSchema: false,",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true,",
                    "     store: 'cache',",
                    "     format: 'inline',",
                    "     output: true,",
                    "     saveOrder: 1) ~> sink1"
                ]
            }
        }
    }
    

    Pipeline Json:

    {
        "name": "subdf",
        "properties": {
            "activities": [
                {
                    "name": "Data flow1",
                    "type": "ExecuteDataFlow",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "dataflow": {
                            "referenceName": "dataflow1",
                            "type": "DataFlowReference"
                        },
                        "compute": {
                            "coreCount": 8,
                            "computeType": "General"
                        },
                        "traceLevel": "None",
                        "cacheSinks": {
                            "firstRowOnly": false
                        }
                    }
                },
                {
                    "name": "ForEach1",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Data flow1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('Data flow1').output.runStatus.output.sink1.value\n",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Get Metadata1",
                                "type": "GetMetadata",
                                "dependsOn": [],
                                "policy": {
                                    "timeout": "0.12:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "dataset": {
                                        "referenceName": "DelimitedText6",
                                        "type": "DatasetReference",
                                        "parameters": {
                                            "fileName": {
                                                "value": "@item().filepath",
                                                "type": "Expression"
                                            }
                                        }
                                    },
                                    "fieldList": [
                                        "lastModified",
                                        "itemName"
                                    ],
                                    "storeSettings": {
                                        "type": "SftpReadSettings",
                                        "enablePartitionDiscovery": false,
                                        "disableChunking": false
                                    },
                                    "formatSettings": {
                                        "type": "DelimitedTextReadSettings"
                                    }
                                }
                            },
                            {
                                "name": "If Condition1",
                                "type": "IfCondition",
                                "dependsOn": [
                                    {
                                        "activity": "Get Metadata1",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "userProperties": [],
                                "typeProperties": {
                                    "expression": {
                                        "value": "@greater(activity('Get Metadata1').output.lastModified,variables('refd'))",
                                        "type": "Expression"
                                    },
                                    "ifTrueActivities": [
                                        {
                                            "name": "referencedate",
                                            "type": "SetVariable",
                                            "dependsOn": [],
                                            "policy": {
                                                "secureOutput": false,
                                                "secureInput": false
                                            },
                                            "userProperties": [],
                                            "typeProperties": {
                                                "variableName": "refd",
                                                "value": {
                                                    "value": "@activity('Get Metadata1').output.lastModified",
                                                    "type": "Expression"
                                                }
                                            }
                                        },
                                        {
                                            "name": "ltf",
                                            "type": "SetVariable",
                                            "dependsOn": [
                                                {
                                                    "activity": "referencedate",
                                                    "dependencyConditions": [
                                                        "Succeeded"
                                                    ]
                                                }
                                            ],
                                            "policy": {
                                                "secureOutput": false,
                                                "secureInput": false
                                            },
                                            "userProperties": [],
                                            "typeProperties": {
                                                "variableName": "ltf",
                                                "value": {
                                                    "value": "@activity('Get Metadata1').output.itemName",
                                                    "type": "Expression"
                                                }
                                            }
                                        }
                                    ]
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "fnwlmd",
                    "type": "SetVariable",
                    "dependsOn": [
                        {
                            "activity": "ForEach1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "policy": {
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "variableName": "fnwlmd",
                        "value": {
                            "value": "@variables('ltf')",
                            "type": "Expression"
                        }
                    }
                }
            ],
            "variables": {
                "latest file": {
                    "type": "Array"
                },
                "refd": {
                    "type": "String",
                    "defaultValue": "1900-01-01 00:00:00"
                },
                "ltf": {
                    "type": "String"
                },
                "fnwlmd": {
                    "type": "String"
                }
            },
            "annotations": []
        }
    }