Search code examples
azureazure-data-factory

How create concat array within a variable for lookup activity output, iterated within foreach loop


ImageI am currently reading a SQL Table which has more than 5000 records. Since Lookup activity doesnt support more than 5000 Records. I had to create a foreach loop which will iterate based on totalrecords/5000 and inside lookup will fetch first 5000 records then for next ietration it will fetch another 5000 and so on. however i am stuck on how to pass the each lookup activity output array to a variable.

My Pipeline look like this.

{
    "name": "pipeline2",
    "properties": {
        "activities": [
            {
                "name": "GetRowCount_FromMyTable",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderQuery": {
                            "value": "SELECT COUNT(*) as TotalCount FROM MyTable",
                            "type": "Expression"
                        },
                        "queryTimeout": "02:00:00",
                        "partitionOption": "None"
                    },
                    "dataset": {
                        "referenceName": "ds_sql_extraction",
                        "type": "DatasetReference"
                    }
                },
                "inputs": [
                    {
                        "referenceName": "ds_sql_extraction",
                        "type": "DatasetReference"
                    }
                ],
                "linkedServiceName": {
                    "referenceName": "MyDatabase",
                    "type": "LinkedServiceReference"
                }
            },
            {
                "name": "IterativeLookup",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "GetRowCount_OffSetTable",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@range(0, add(div(activity('GetRowCount_OffSetTable').output.firstRow.TotalCount, 5000), 1))",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "LookupActivity",
                            "type": "Lookup",
                            "dependsOn": [],
                            "policy": {
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [
                                {
                                    "name": "LookupIterations",
                                    "value": "@{item()}"
                                }
                            ],
                            "typeProperties": {
                                "source": {
                                    "type": "AzureSqlSource",
                                    "sqlReaderQuery": {
                                        "value": "SELECT * FROM MyTable ORDER BY OffsetValue OFFSET @{mul(int(item()), 5000)} ROWS FETCH NEXT 5000 ROWS ONLY\n\n",
                                        "type": "Expression"
                                    },
                                    "queryTimeout": "02:00:00",
                                    "partitionOption": "None"
                                },
                                "dataset": {
                                    "referenceName": "ds_sql_extraction",
                                    "type": "DatasetReference"
                                },
                                "firstRowOnly": false
                            },
                            "inputs": [
                                {
                                    "referenceName": "ds_sql_extraction",
                                    "type": "DatasetReference"
                                }
                            ]
                        },
                        {
                            "name": "Set variable1",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "LookupActivity",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "LookupArray",
                                "value": {
                                    "value": "@string(item().value)",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "variables": {
            "LookupArray": {
                "type": "Array"
            },
            "AnotherArray": {
                "type": "Array"
            },
            "LookupString": {
                "type": "String"
            },
            "Stringg": {
                "type": "Array"
            },
            "Test": {
                "type": "String"
            },
            "test2": {
                "type": "Array"
            },
            "test1": {
                "type": "String"
            },
            "NewArraySet": {
                "type": "Array"
            }
        },
        "folder": {
            "name": "Data_Extraction"
        },
        "annotations": []
    }
}

how to get the two LookupActivity (Iterated) outputs to one single variable?


Solution

  • As you don't have any duplicates in your table, you can union the lookup output array in each iteration with its previous iteration lookup output array to get the final array.

    Here, for sample instead of 5000 rows, I took 4 rows as limit where my table consists of total 16 rows and my id column is equivalent to your OffsetValue column.

    First, create two array variables temp_arr and res_arr with empty default values like below.

    enter image description here

    I have followed same approach and same queries as yours till the second lookup activity.

    Inside For-Each, after second lookup activity, take a set variable activity for temp_arr and give @variables('res_arr') to it.

    enter image description here

    Next, take another set variable activity for res_arr variable and give the below expression.

    @union(variables('temp_arr'),activity('Lookup2').output.value)
    

    enter image description here

    In ADF, self-referencing variables is not supported, that is the reason why the temp_arr variable was used.

    At the end of the For-loop, the result final array will be stored in the res_arr variable.

    Result:

    enter image description here

    UPDATE:

    Your activities are running parallelly inside For-Each. You need to check the Sequential check box in the For-Each activity to run the activities sequentially.

    enter image description here

    The output array will be stored in the res_arr variable. If you want to access the array after For-Each, you need to use this array.

    My pipeline JSON for your reference:

    Change the datasets name and query and activity names as per your requirement and use it.

    {
        "name": "lookup concat pipeline",
        "properties": {
            "activities": [
                {
                    "name": "Lookup1",
                    "type": "Lookup",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "AzureSqlSource",
                            "sqlReaderQuery": "SELECT COUNT(*) as TotalCount FROM lookup1",
                            "queryTimeout": "02:00:00",
                            "partitionOption": "None"
                        },
                        "dataset": {
                            "referenceName": "lookup_table",
                            "type": "DatasetReference"
                        },
                        "firstRowOnly": true
                    }
                },
                {
                    "name": "ForEach1",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Lookup1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@range(0, add(div(activity('Lookup1').output.firstRow.TotalCount, 4), 1))",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Lookup2",
                                "type": "Lookup",
                                "dependsOn": [],
                                "policy": {
                                    "timeout": "0.12:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "source": {
                                        "type": "AzureSqlSource",
                                        "sqlReaderQuery": {
                                            "value": "SELECT * FROM lookup1 ORDER BY id OFFSET @{mul(int(item()), 4)} ROWS FETCH NEXT 4 ROWS ONLY",
                                            "type": "Expression"
                                        },
                                        "queryTimeout": "02:00:00",
                                        "partitionOption": "None"
                                    },
                                    "dataset": {
                                        "referenceName": "lookup_table",
                                        "type": "DatasetReference"
                                    },
                                    "firstRowOnly": false
                                }
                            },
                            {
                                "name": "temp_arr",
                                "type": "SetVariable",
                                "dependsOn": [
                                    {
                                        "activity": "Lookup2",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "policy": {
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "temp_arr",
                                    "value": {
                                        "value": "@variables('res_arr')",
                                        "type": "Expression"
                                    }
                                }
                            },
                            {
                                "name": "union temp and and lookup array",
                                "type": "SetVariable",
                                "dependsOn": [
                                    {
                                        "activity": "temp_arr",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "policy": {
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "res_arr",
                                    "value": {
                                        "value": "@union(variables('temp_arr'),activity('Lookup2').output.value)",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                }
            ],
            "variables": {
                "temp_arr": {
                    "type": "Array"
                },
                "res_arr": {
                    "type": "Array"
                }
            },
            "annotations": [],
            "lastPublishTime": "2024-05-29T04:16:16Z"
        },
        "type": "Microsoft.DataFactory/factories/pipelines"
    }