Objective: For each sub-directory in a root directory (DIRECTORY-B
), get all the child items iteratively. For this, I've a SFTP folder with the below hierarchy:
SFTP_FOLDER
- DIRECTORY-A
- DIRECTORY-B
- Sub-Directory I
- a.csv
- b.csv
- Sub-Directory II
- c.csv
- Sub-Directory III
- d.csv
- e.csv
- DIRECTORY-C
For this, I've setup a two-step approach: GetMetadata > ForEach(GetMetadata)
to get all the files in sub-directory.
I'm able to get all the directories from GET Directory Structure
, however I am not able to parametrize the directory name in GET File List
, which is configured like:
Dataset Description:
Though all the values are passed properly, the child items are not retrieved from the directory. My question: How to make the folder name dynamic keeping SFTP_FOLDER/DIRECTORY-B
constant?
Alternatively, is there a simple way to recursively find and copy the latest modified files in a directory with subdirectories from source to sink?
NOTE: I've to go from folder to folder, and wildcard like SFTP_FOLDER/DIRECTORY-B/*/*.csv
is not applicable as I want to retrieve only the latest file from each directory.
This question is similar to Q63566122, however I have already implemented the mentioned process but not able to achieve the result.
It is better to use data flow instead of get metadata to use wild card path for file path as follows:
Create a dataset with an SFTP folder and select it as the data flow source dataset. Go to source settings and use DIRECTORY-B/*/*.csv
. Add the column, and if you want, you can also add the Start time (subDays(currentUTC(),1
) and End time (currentUTC()
) filters for the files, but make sure to change the expression as per your time as shown below:
Here, regardless of the file structure, all rows will be merged, and a column will be added containing the file path for each specific row, as shown below:
Use aggregation transformation, group it by filepath column, and create a sample column for aggregation. Select the filepath column using select transformation, add sink transformation, and select cache as the sink type. Check Write to activity output as shown below:
If you execute the data flow, you will get the output as shown below:
To get the latest file from DIRECTORY-B, add a for each activity on the success of the data flow and give the below expression:
@activity('Data flow1').output.runStatus.output.sink1.value
Add a get metadata activity inside the foreach. Select a dataset with a parameter for the file name and add @item().filepath
expression for the parameter. Create Last modified and Item name under the field list as shown below:
Create two variables named refd with 1900-01-01 00:00:00 value to get the latest modified date of the file and ltf to store the latest modified file name. Add an if condition and use the below expression to compare file modified dates with the refd variable value:
@greater(activity('Get Metadata1').output.lastModified,variables('refd'))
If True, add a set variable activity inside True. Call the refid variable and add the below expression as the value. Update the variable with the last modified date:
@activity('Get Metadata1').output.lastModified
On the success of the set variable, add another set variable and call ltf. Add the below expression to store the latest modified file name:
@activity('Get Metadata1').output.itemName
On the success of the foreach, add a set variable, and add @variables('ltf')
expression as the value for the variable to store the latest modified file. If you debug the pipeline, you will get the latest modified file of DIRECTORY-B as shown below:
For your reference here is Dataflow Json:
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DelimitedText5",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"name": "sink1"
}
],
"transformations": [
{
"name": "aggregate1"
},
{
"name": "select1",
"description": "Renaming aggregate1 to select1 with columns 'filepath'"
}
],
"scriptLines": [
"source(output(",
" Id as string,",
" Name as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false,",
" rowUrlColumn: 'filepath',",
" modifiedAfter: (subDays(currentUTC(),1) ),",
" modifiedBefore: (currentUTC()),",
" wildcardPaths:['DIRECTORY-B/*/*.csv']) ~> source1",
"source1 aggregate(groupBy(filepath),",
" cnt = count(filepath)) ~> aggregate1",
"aggregate1 select(mapColumn(",
" filepath",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"select1 sink(validateSchema: false,",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" store: 'cache',",
" format: 'inline',",
" output: true,",
" saveOrder: 1) ~> sink1"
]
}
}
}
Pipeline Json:
{
"name": "subdf",
"properties": {
"activities": [
{
"name": "Data flow1",
"type": "ExecuteDataFlow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataflow": {
"referenceName": "dataflow1",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "None",
"cacheSinks": {
"firstRowOnly": false
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Data flow1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Data flow1').output.runStatus.output.sink1.value\n",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Get Metadata1",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "DelimitedText6",
"type": "DatasetReference",
"parameters": {
"fileName": {
"value": "@item().filepath",
"type": "Expression"
}
}
},
"fieldList": [
"lastModified",
"itemName"
],
"storeSettings": {
"type": "SftpReadSettings",
"enablePartitionDiscovery": false,
"disableChunking": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "If Condition1",
"type": "IfCondition",
"dependsOn": [
{
"activity": "Get Metadata1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@greater(activity('Get Metadata1').output.lastModified,variables('refd'))",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "referencedate",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "refd",
"value": {
"value": "@activity('Get Metadata1').output.lastModified",
"type": "Expression"
}
}
},
{
"name": "ltf",
"type": "SetVariable",
"dependsOn": [
{
"activity": "referencedate",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "ltf",
"value": {
"value": "@activity('Get Metadata1').output.itemName",
"type": "Expression"
}
}
}
]
}
}
]
}
},
{
"name": "fnwlmd",
"type": "SetVariable",
"dependsOn": [
{
"activity": "ForEach1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "fnwlmd",
"value": {
"value": "@variables('ltf')",
"type": "Expression"
}
}
}
],
"variables": {
"latest file": {
"type": "Array"
},
"refd": {
"type": "String",
"defaultValue": "1900-01-01 00:00:00"
},
"ltf": {
"type": "String"
},
"fnwlmd": {
"type": "String"
}
},
"annotations": []
}
}