Search code examples
excelazureazure-sql-databaseazure-data-factoryazure-blob-storage

Azure blob-to-SQL automatic copy


I am new to Azure and I am trying to understand how to do my task and which services I need.

My case is this one: I have some excel files that will be uploaded to BLOB storage. The excel files will all have the same structure. I need to trigger a pipeline to copy the data into a SQL database every time a new file is uploaded. Then, I need to read the excel and select a different sink for each row, where the table name is given by the value in a specific column of the excel.

So, to sum up what I need to know:

  • I tried DataFactory but I don't know how to create a pipeline that has a dynamic source (the newly uploaded file).
  • I don't know how to trigger the pipeline (and maybe get the filename from the trigger?)
  • I need to read the excel line by line and select a different sink for each line

I don't know if the services I tried are the best for this task, but I do not have any constraint so feel free to suggest additional or different ones.

Thanks


Solution

  • You need to use Dataset parameters and pipeline parameters to pass the file name from storage event trigger to the pipeline dynamically.

    The below approach will only work when you know the excel file structure, data types of the target tables and the number of rows of the excel file should be below 5000.

    First go to pipeline parameters section and create a pipeline parameter getfilename of string type without any default value. This file name from the trigger will be passed to this pipeline parameter and then this will used when creating dynamic datasets.

    You can go through this documentation to know how to create a storage event trigger. While creating, give the required details like storage account name and container name and give .xlsx in the Blob path ends with option. At the end, pass the trigger parameter @triggerBody().fileName to the pipeline parameter. You can also check this SO answer to know how to pass the values to the dataset parameters from trigger parameters.

    Now, create 3 datasets in which two are for excel files and one is for the SQL table.

    lookup_excel dataset:

    Go to the parameters section in this dataset and create a parameter filename of string type without any default value. Then, use that parameter in the file name of the dataset like @dataset().filename and give the below configurations as shown below. Also, go to schema, and clear the schema if it has any.

    enter image description here

    This dataset will be used with a lookup activity to get all the table names from each row. The lookup activity limitation is 5000 rows only.

    Excel1 dataset:

    In this dataset create two parameters filename and range_value of string type without any default values and use those in the dataset like below with the following configurations.

    file name - @dataset().filename
    Range - A@{dataset().range_value}:D@{dataset().range_value}
    

    enter image description here

    This dataset will be given to a copy activity inside a loop and file name and Range parameter values will be passed through a loop so that it extracts each row in each iteration.

    SQL sink dataset:

    Create table_name parameter as same as above.

    enter image description here

    The table name value will be given in a loop.

    Now, in the pipeline take a look up activity and give the lookup_excel dataset. It will ask to provide value to the dataset parameter filename. Pass the pipeline parameter @pipeline().parameters.getfilename to this.

    enter image description here

    Then, take a For-each activity and give the below expression.

    @range(0,length(activity('Lookup1').output.value))
    

    enter image description here

    Inside, for-each activity, take copy activity with Excel1 dataset as source and SQL dataset as sink. In the source, pass the below values to the dataset parameters.

    range_value - @string(add(item(),2))
    filename - @pipeline().parameters.getfilename
    

    enter image description here

    In the copy activity sink, the target table in each iteration needs to be created dynamically if not exists.

    Use the below expression in the Pre-copy script of the copy activity.

    IF OBJECT_ID(N'dbo.@{activity('Lookup1').output.value[item()].table_name}', N'U') IS NULL
    CREATE TABLE dbo.@{activity('Lookup1').output.value[item()].table_name} (id int,name varchar(50),table_name varchar(50),age int);
    

    Here, you need to change the script as per your required schema.

    Pass the below expression to the dataset parameter table_name.

    @activity('Lookup1').output.value[item()].table_name
    

    enter image description here

    You can use Auto create table option but it will create target tables data types as per the excel file which comes as only strings by default.

    Now, you need to set the mapping of the source and target columns. Go to mapping-> click on Add dynamic content and give the below expression.

    @json('{
                                        "type": "TabularTranslator",
                                        "mappings": [
                                            {
                                                "source": {
                                                    "type": "String",
                                                    "ordinal": 1
                                                },
                                                "sink": {
                                                    "name": "id",
                                                    "physicalType": "Int32"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "type": "String",
                                                    "ordinal": 2
                                                },
                                                "sink": {
                                                    "name": "name",
                                                    "physicalType": "nvarchar"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "type": "String",
                                                    "ordinal": 3
                                                },
                                                "sink": {
                                                    "name": "table_name",
                                                    "physicalType": "nvarchar"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "type": "String",
                                                    "ordinal": 4
                                                },
                                                "sink": {
                                                    "name": "age",
                                                    "physicalType": "Int32"
                                                }
                                            }
                                        ],
                                        "typeConversion": true,
                                        "typeConversionSettings": {
                                            "allowDataTruncation": true,
                                            "treatBooleanAsNumber": false
                                        }
                                    }')
    
    

    In the above expression, you need to modify the column names and data types of the sink as per your target tables schema. You can add the columns as well by including new objects in the above JSON.

    enter image description here

    Now, publish the trigger and the pipeline. Whenever a new file uploaded, or any file modified, the pipeline will be triggered, and the required tables will be created in the target.

    enter image description here