I have below notebook job within the ADF which loads a excell/csv file into a blob storage.
sheetId='33222424'
load_time = today.strftime("%Y%m%d")
path = 'my_path'
smartsheet_client.Sheets.get_sheet_as_excel(sheetId, path, 'my_file_'+load_time+'.csv')
The above code create a csv file with current time and loading to the given path in blobstorage
Next i want load this file into snowflake table. since the blob storage have many other files, i want to load the latest created file into the snowflake.
#Outputfile name in blobstorage -- my_file_20240112.csv'
File contain data
id | Name
101 | John
102 | Hyden
103 | Vivek
So which activity can use to load the latest file from the blob path
what is the best approach for this?
As you want to get the latest file name by both date in the file name and loading time, you need to use loop to get the required file name.
First create below variables in the pipeline:
Now, create two datasets, one for getting child items from the input path and another one is for looping through those files.
In the first dataset, give the path only till the folder of the source files. Use this in the Get meta data activity. In the Get meta data activity give the Start time dynamic content @adddays(utcnow(),-1)
like below. This will filter the files which are modified in the last day.
In another dataset, create a string type parameter filename
and use that as dynamic content in the file name like below.
Follow the below pipeline flow:
- Get meta data activity1 -> (Gets all child items which are modified in the last day)
- Filter -> (Filters the file names which contains the string 'my_file')
- For loop -> (Give the filter output array and make sure to check Sequential checkbox)
- Get meta data activity2 -> (Give the parameterized dataset and provide @item().name as value for it. Give last modified Field list)
- If -> (Checks whether last modified from Get meta data2 is greater than mydate variable or not)
- True activities
- Set variable activity -> (Update the mydate variable to Get meta data2 last modified)
- Set variable activity -> (Update the latest_filename variable to @item().name)
- Set variable activity -> (This is optional only showing result variable)
Filter activity:
Items: @activity('Get Metadata1').output.childItems
Condition: @contains(item().name, 'my_file')
Get meta data activity2 inside Foreach:
If activity condition:
@greaterOrEquals(ticks(activity('Get Metadata2').output.lastModified), ticks(variables('mydate')))
Update variables mydate
and latest_filename
inside True activities
of if:
Result:
Now, use copy activity after this and give the above variable in the file name of the dataset. Give your Snowflake table as sink dataset to this copy activity and execute the pipeline.
My Pipeline JSON for your reference:
{
"name": "pipeline3",
"properties": {
"activities": [
{
"name": "Get Metadata1",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "source_files",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"modifiedDatetimeStart": {
"value": "@adddays(utcnow(),-1)",
"type": "Expression"
},
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Filter1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Filter1').output.Value",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Get Metadata2",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "childfiles",
"type": "DatasetReference",
"parameters": {
"filename": {
"value": "@item().name",
"type": "Expression"
}
}
},
"fieldList": [
"lastModified"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "If Condition1",
"type": "IfCondition",
"dependsOn": [
{
"activity": "Get Metadata2",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@greaterOrEquals(ticks(activity('Get Metadata2').output.lastModified), ticks(variables('mydate')))",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "Update date",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "mydate",
"value": {
"value": "@activity('Get Metadata2').output.lastModified",
"type": "Expression"
}
}
},
{
"name": "Update filename",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Update date",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "latest_filename",
"value": {
"value": "@item().name",
"type": "Expression"
}
}
}
]
}
}
]
}
},
{
"name": "Filter1",
"type": "Filter",
"dependsOn": [
{
"activity": "Get Metadata1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Get Metadata1').output.childItems",
"type": "Expression"
},
"condition": {
"value": "@contains(item().name, 'my_file')",
"type": "Expression"
}
}
},
{
"name": "Res filename showing",
"type": "SetVariable",
"dependsOn": [
{
"activity": "ForEach1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "res_filename",
"value": {
"value": "@variables('latest_filename')",
"type": "Expression"
}
}
}
],
"variables": {
"mydate": {
"type": "String",
"defaultValue": "2022-10-20"
},
"latest_filename": {
"type": "String"
},
"res_filename": {
"type": "String"
}
},
"annotations": []
}
}