Search code examples
azuresnowflake-cloud-data-platformazure-data-factoryazure-databricks

Azure Data Factory - How to load recently loaded file from a blob into snowflake?


I have below notebook job within the ADF which loads a excell/csv file into a blob storage.

sheetId='33222424'
load_time = today.strftime("%Y%m%d")
path = 'my_path'
smartsheet_client.Sheets.get_sheet_as_excel(sheetId, path, 'my_file_'+load_time+'.csv')

The above code create a csv file with current time and loading to the given path in blobstorage

Next i want load this file into snowflake table. since the blob storage have many other files, i want to load the latest created file into the snowflake.

#Outputfile name in blobstorage -- my_file_20240112.csv'

File contain data

id  | Name
101 | John
102 | Hyden
103 | Vivek 

So which activity can use to load the latest file from the blob path

what is the best approach for this?


Solution

  • As you want to get the latest file name by both date in the file name and loading time, you need to use loop to get the required file name.

    First create below variables in the pipeline:

    enter image description here

    Now, create two datasets, one for getting child items from the input path and another one is for looping through those files.

    In the first dataset, give the path only till the folder of the source files. Use this in the Get meta data activity. In the Get meta data activity give the Start time dynamic content @adddays(utcnow(),-1) like below. This will filter the files which are modified in the last day.

    enter image description here

    In another dataset, create a string type parameter filename and use that as dynamic content in the file name like below.

    enter image description here

    Follow the below pipeline flow:

    - Get meta data activity1 -> (Gets all child items which are modified in the last day)
    - Filter -> (Filters the file names which contains the string 'my_file')
    - For loop -> (Give the filter output array and make sure to check Sequential checkbox)
        - Get meta data activity2 -> (Give the parameterized dataset and provide @item().name as value for it. Give last modified Field list)
        - If -> (Checks whether last modified from Get meta data2 is greater than mydate variable or not)
            - True activities 
            - Set variable activity -> (Update the mydate variable to Get meta data2 last modified)
            - Set variable activity -> (Update the latest_filename variable to @item().name)
    - Set variable activity -> (This is optional only showing result variable)
    

    Filter activity:

    Items: @activity('Get Metadata1').output.childItems
    Condition: @contains(item().name, 'my_file')
    

    enter image description here

    Get meta data activity2 inside Foreach:

    enter image description here

    If activity condition:

    @greaterOrEquals(ticks(activity('Get Metadata2').output.lastModified), ticks(variables('mydate')))
    

    Update variables mydate and latest_filename inside True activities of if:

    enter image description here

    Result:

    enter image description here

    Now, use copy activity after this and give the above variable in the file name of the dataset. Give your Snowflake table as sink dataset to this copy activity and execute the pipeline.

    My Pipeline JSON for your reference:

    {
        "name": "pipeline3",
        "properties": {
            "activities": [
                {
                    "name": "Get Metadata1",
                    "type": "GetMetadata",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "dataset": {
                            "referenceName": "source_files",
                            "type": "DatasetReference"
                        },
                        "fieldList": [
                            "childItems"
                        ],
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "modifiedDatetimeStart": {
                                "value": "@adddays(utcnow(),-1)",
                                "type": "Expression"
                            },
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "DelimitedTextReadSettings"
                        }
                    }
                },
                {
                    "name": "ForEach1",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Filter1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('Filter1').output.Value",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Get Metadata2",
                                "type": "GetMetadata",
                                "dependsOn": [],
                                "policy": {
                                    "timeout": "0.12:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "dataset": {
                                        "referenceName": "childfiles",
                                        "type": "DatasetReference",
                                        "parameters": {
                                            "filename": {
                                                "value": "@item().name",
                                                "type": "Expression"
                                            }
                                        }
                                    },
                                    "fieldList": [
                                        "lastModified"
                                    ],
                                    "storeSettings": {
                                        "type": "AzureBlobFSReadSettings",
                                        "enablePartitionDiscovery": false
                                    },
                                    "formatSettings": {
                                        "type": "DelimitedTextReadSettings"
                                    }
                                }
                            },
                            {
                                "name": "If Condition1",
                                "type": "IfCondition",
                                "dependsOn": [
                                    {
                                        "activity": "Get Metadata2",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "userProperties": [],
                                "typeProperties": {
                                    "expression": {
                                        "value": "@greaterOrEquals(ticks(activity('Get Metadata2').output.lastModified), ticks(variables('mydate')))",
                                        "type": "Expression"
                                    },
                                    "ifTrueActivities": [
                                        {
                                            "name": "Update date",
                                            "type": "SetVariable",
                                            "dependsOn": [],
                                            "policy": {
                                                "secureOutput": false,
                                                "secureInput": false
                                            },
                                            "userProperties": [],
                                            "typeProperties": {
                                                "variableName": "mydate",
                                                "value": {
                                                    "value": "@activity('Get Metadata2').output.lastModified",
                                                    "type": "Expression"
                                                }
                                            }
                                        },
                                        {
                                            "name": "Update filename",
                                            "type": "SetVariable",
                                            "dependsOn": [
                                                {
                                                    "activity": "Update date",
                                                    "dependencyConditions": [
                                                        "Succeeded"
                                                    ]
                                                }
                                            ],
                                            "policy": {
                                                "secureOutput": false,
                                                "secureInput": false
                                            },
                                            "userProperties": [],
                                            "typeProperties": {
                                                "variableName": "latest_filename",
                                                "value": {
                                                    "value": "@item().name",
                                                    "type": "Expression"
                                                }
                                            }
                                        }
                                    ]
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "Filter1",
                    "type": "Filter",
                    "dependsOn": [
                        {
                            "activity": "Get Metadata1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('Get Metadata1').output.childItems",
                            "type": "Expression"
                        },
                        "condition": {
                            "value": "@contains(item().name, 'my_file')",
                            "type": "Expression"
                        }
                    }
                },
                {
                    "name": "Res filename showing",
                    "type": "SetVariable",
                    "dependsOn": [
                        {
                            "activity": "ForEach1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "policy": {
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "variableName": "res_filename",
                        "value": {
                            "value": "@variables('latest_filename')",
                            "type": "Expression"
                        }
                    }
                }
            ],
            "variables": {
                "mydate": {
                    "type": "String",
                    "defaultValue": "2022-10-20"
                },
                "latest_filename": {
                    "type": "String"
                },
                "res_filename": {
                    "type": "String"
                }
            },
            "annotations": []
        }
    }