In blob storage I have multiple csv files without column names, using get meta activity I am getting all those files and I want to upload these files to respective tables in Azure SQL using copy data activity in a loop. (each table as different schema). below are the 2 sample files. I have tried with single file. i want to do it for multiple files in a loop.
**sample1.txt**
5|1|300|100| |101|809|4|4|4|0|0|0|0|0|0|0|0|2|0|0|0|0|-4|0|0||2020-07-06
5|1|300|100| |102|809|6|5|5|0|0|0|0|0|0|0|0|2|0|0|0|0|-5|0|0||2020-07-14
5|1|300|100| |103|809|-1|-1|-1|0|0|0|0|0|0|0|0|2|0|0|0|0|1|0|0||2020-07-05
5|1|300|100| |104|809|7|7|7|0|0|0|0|0|0|0|0|2|0|0|0|0|-7|0|0||2020-07-05
5|1|300|100| |105|809|-5|-5|-5|0|0|0|0|0|0|0|0|2|0|0|0|0|5|0|0||2021-08-18
5|1|300|100| |106|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-05
5|1|300|100| |107|809|8|8|8|0|0|0|0|0|0|0|0|2|0|0|0|0|-8|0|0||2020-07-14
5|1|300|100| |108|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-08
5|1|300|100| |109|809|2|2|2|0|0|0|0|0|0|0|0|2|0|0|0|0|-2|0|0||2020-07-14
5|1|300|100| |111|809|2|2|2|0|0|0|0|0|0|0|0|2|0|0|0|0|-2|0|0||2020-07-07
5|1|300|100| |112|809|4|4|4|0|0|0|0|0|0|0|0|2|0|0|0|0|-4|0|0||2020-07-05
5|1|300|100| |114|809|3|3|3|0|0|0|0|0|0|0|0|2|0|0|0|0|-3|0|0||2020-07-08
**sample2.txt**
5|1|300|100| |131|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-06
5|1|300|100| |132|809|7|7|7|0|0|0|0|0|0|0|0|2|0|0|0|0|-7|0|0||2020-07-08
5|1|300|100| |135|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-05
5|1|300|100| |136|809|2|2|2|0|0|0|0|0|0|0|0|2|0|0|0|0|-2|0|0||2020-07-08
5|1|300|100| |138|809|5|5|5|0|0|0|0|0|0|0|0|2|0|0|0|0|-5|0|0||2020-07-08
5|1|300|100| |139|809|3|3|3|0|0|0|0|0|0|0|0|2|0|0|0|0|-3|0|0||2020-07-07
5|1|300|100| |140|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-06
5|1|300|100| |142|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-08
5|1|300|100| |143|809|3|3|3|0|0|0|0|0|0|0|0|2|0|0|0|0|-3|0|0||2020-07-14
5|1|300|100| |146|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-05```
For single header less source file, you can do the mapping manually. But for the multiple files, the mapping needs to be done dynamically irrespective of header. Hence, using copy activity it might get difficult to build the schema every time dynamically. Instead, use dataflow which will map the schema dynamically to the target table.
If all of your target SQL tables are already created, then you need to loop through both table names array and Get meta data file list array at a same time.
First get the correct order of table names array which should be of same order as file names array. Now, you need to iterate through both 2 arrays in same for loop. Go through this SO answer to know how to loop through 2 same length arrays at same time.
Or if your target table names are same as your file name (filename.txt
), then there is no need to iterate another array. You can extract the table names from the filename itself. I am following this approach.
First give the Get meta data activity child items array to the For each activity items.
Create datasets for the source csv file and target SQL table. To iterate through files and tables, use dataset parameters. Create string type parameters and use them in the file name and table name dynamic content like below.
Source csv dataset with dataset parameter for the filename:
Target SQL table dataset with dataset parameter for the table name:
Here, divide the process in 2 parts. One is Handling data copy to already existing tables and another is Copying data to a new table with table creation.
To do this, first use the below query in a lookup activity inside ForEach to check current table is already exist or not.
IF OBJECT_ID (N'@{split(item().name,'.')[0]}', N'U') IS NOT NULL
SELECT 1 AS res ELSE SELECT 0 AS res;
Use the below expression in an if activity condition.
@equals(activity('Lookup1').output.firstRow.res, 1)
Now, use 2 dataflows. One for the already existing tables and another is for new tables.
For the dataflow of already existing dataflow, add the header for the csv data using union transformation like below. First take two sources, Target table dataset and source csv dataset. To select only the columns from the target table, give the below expression in the source query dynamic expression. Create a dataflow parameter of string type table_name
and use it in the expression.
"select * from {$table_name} where 'Rakesh'='Laddu'"
Do the union by position and add the same target table dataset as sink to the dataflow.
In pipeline, take dataflow activity for this inside the if activity True activities and pass the dataset parameters like below.
Also, pass the same expression @split(item().name,'.')[0]
for the dataflow parameter table_name
in the same dataflow activity.
For the new tables, take another dataflow, give the csv dataset as source and SQL target table dataset as sink. In sink settings check Recreate table option.
Add this dataflow in the False activities of if and pass the dataset parameter values same as above.
Now, execute the pipeline. For existing tables, the data will be inserted and for non-existing tables, respective tables will be created with default column names from ADF like below.
This JSON of dataflow for the existing tables:
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "source1"
},
{
"dataset": {
"referenceName": "source_csvs",
"type": "DatasetReference"
},
"name": "source2"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "union1"
}
],
"scriptLines": [
"parameters{",
" table_name as string",
"}",
"source(allowSchemaDrift: true,",
" validateSchema: false,",
" isolationLevel: 'READ_UNCOMMITTED',",
" query: (\"select * from {$table_name} where 'Rakesh'='Laddu'\"),",
" format: 'query') ~> source1",
"source(allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source2",
"source1, source2 union(byName: false)~> union1",
"union1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> sink1"
]
}
}
}
Dataflow JSON for the new tables:
{
"name": "dataflow2",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "source_csvs",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [],
"scriptLines": [
"source(allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"source1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" recreate:true,",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> sink1"
]
}
}
}
This Pipeline JSON:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Get Metadata1",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "Get_csv_list",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Get Metadata1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Get Metadata1').output.childItems",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Lookup1",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "IF OBJECT_ID (N'@{split(item().name,'.')[0]}', N'U') IS NOT NULL \n SELECT 1 AS res ELSE SELECT 0 AS res;",
"type": "Expression"
},
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference",
"parameters": {
"table_name": "''"
}
},
"firstRowOnly": true
}
},
{
"name": "If Condition1",
"type": "IfCondition",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@equals(activity('Lookup1').output.firstRow.res, 1)",
"type": "Expression"
},
"ifFalseActivities": [
{
"name": "Data flow2",
"type": "ExecuteDataFlow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataflow": {
"referenceName": "dataflow2",
"type": "DataFlowReference",
"datasetParameters": {
"source1": {
"filename": {
"value": "@item().name",
"type": "Expression"
}
},
"sink1": {
"table_name": {
"value": "@split(item().name,'.')[0]",
"type": "Expression"
}
}
}
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine"
}
}
],
"ifTrueActivities": [
{
"name": "Data flow1",
"type": "ExecuteDataFlow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataflow": {
"referenceName": "dataflow1",
"type": "DataFlowReference",
"parameters": {
"table_name": {
"value": "'@{split(item().name,'.')[0]}'",
"type": "Expression"
}
},
"datasetParameters": {
"source1": {
"table_name": {
"value": "@split(item().name,'.')[0]",
"type": "Expression"
}
},
"source2": {
"filename": {
"value": "@item().name",
"type": "Expression"
}
},
"sink1": {
"table_name": {
"value": "@split(item().name,'.')[0]",
"type": "Expression"
}
}
}
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine"
}
}
]
}
}
]
}
}
],
"annotations": []
}
}