Search code examples
azureazure-synapseazure-data-lake-gen2

Azure Synapse, Pipeline and Azure Datalake


I have an ETL pipeline that gets triggers when file is uploaded into Datalake location "Processing" folder, Once data is processed I want to move the file to different location in datalake that is "Processed" folder, how can do that?


Solution

  • If you want to move data from processing folder to processed folder in your datalake, you can add additional activities to your previous copy activity with storage event trigger. An image of a similar approach is provided below.

    enter image description here

    From the above image, ‘from_dl_to_db’ is a copy activity to move file from datalake processing folder to SQL server table. Using ‘to_processed’ copy activity, we can move the file from processing folder to processed folder. Finally, ‘delete_processing’ deletes the file copied to SQL server table from processing folder since it is available in processed folder.

    If you know the file name before uploading, then you can change the name manually in pipeline before uploading and triggering pipeline. But this process takes a lot of time and is not efficient. Due to this reason, you must utilize the Get Metadata which allows you to retrieve filenames. The document below demonstrates an example of how to use GetMetadata activity which you can follow.

    https://www.c-sharpcorner.com/article/extract-file-names-and-copy-from-source-path-in-azure-data-factory/

    From this document, you can understand that we must use 'child items' because we are trying to work with processing folder in datalake, but we are not aware of names of the files that will be uploaded to it. Create a ForEach activity to loop through the contents of processing folder (follow the same process provided in the link).

    In the ForEach activity, navigate to Activities tab to create the activities for next steps. Create the copy activity to move file from datalake to SQL server table. For source dataset you can follow the same steps provided in the link. For Sink dataset, create a linked service to your database. Open the dataset and create a parameter under Parameters tab. Move to Connection tab and specify its value in table name field as shown below.

    enter image description here

    And in your pipeline sink settings check the 'Auto create table' option, then give the value for parameter 'table_name' as shown below.

    enter image description here

    Create another copy activity to move a file from processing to processed folder in datalake. Under the source (Processing folder), set parameter value like you did for source of ‘from_dl_to_db’ activity and specify the sink pointing to ‘processed folder’.

    Create the final delete activity (deleting is necessary because we are using foreach and it will be copied to database again if not deleted). Follow the same procedure for using dynamic parameter as in both the previous source section of copy activities.

    Since you already have storage event trigger, use it and run the pipeline. As soon as you upload a file, it is copied to database as table, then it is copied to processed folder indicating it is processed and deleted from the processing folder. The only drawback using this method is that uploading multiple files to processing folder (source) triggers the pipeline for each file and pipeline might fail. So, try uploading only one file at a time and the pipeline works without issue.

    My output image links:

    upload file to processing folder: https://i.sstatic.net/RqeAY.png

    pipeline execution: https://i.sstatic.net/RnJWu.png

    file copied to database as table: https://i.sstatic.net/HLaID.png

    file moved to processed folder: https://i.sstatic.net/Pn82O.png

    file deleted from processing folder: https://i.sstatic.net/wHHgb.png