Search code examples
jsonazureazure-data-factory

Merging json files into one and adding filename in data before data is merged using ADF


I'm using this approach to merge my individual json files into one and it works :

Using ADF copy actitivyt: Use Wildcard path in source with * in filename. Now in sink, use merge option files merged into one json blob.

All the merged data looks like this in the big json:

{data from file1}
.
.
{data from file2}
.
.
{data from file3} 

The requirement is to have the final format like this:

{"File1.json":
[{
<file1 data>
}], 
"File2.json":
[{
<file2 data>
}]
}

Edit: Not looking for lookup activity to fetch the data as it has size limit.

Is it possible using ADF , please suggest an alternative if not.


Solution

  • You can use get metadata, for each, look up and copy data activity to get the desired result.

    1. First use get metadata activity to get the list of files in your directory.
    2. Iterate through the JSON files returned by this get metadata activity.
    3. Now use an append variable activity to append each of the item as "file_name":[<file_data>].
    4. Outside the for each activity, use a set variable activity to get the required data as string.
    5. Finally use this to build the required JSON. The following is the complete JSON flow:
    {
        "name": "pipeline1",
        "properties": {
            "activities": [
                {
                    "name": "Get Metadata1",
                    "type": "GetMetadata",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "dataset": {
                            "referenceName": "Json2",
                            "type": "DatasetReference"
                        },
                        "fieldList": [
                            "childItems"
                        ],
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "JsonReadSettings"
                        }
                    }
                },
                {
                    "name": "ForEach1",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Get Metadata1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('Get Metadata1').output.childItems",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Lookup1",
                                "type": "Lookup",
                                "dependsOn": [],
                                "policy": {
                                    "timeout": "0.12:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "source": {
                                        "type": "JsonSource",
                                        "storeSettings": {
                                            "type": "AzureBlobFSReadSettings",
                                            "recursive": true,
                                            "enablePartitionDiscovery": false
                                        },
                                        "formatSettings": {
                                            "type": "JsonReadSettings"
                                        }
                                    },
                                    "dataset": {
                                        "referenceName": "Json3",
                                        "type": "DatasetReference"
                                    },
                                    "firstRowOnly": false
                                }
                            },
                            {
                                "name": "Append variable1",
                                "type": "AppendVariable",
                                "dependsOn": [
                                    {
                                        "activity": "Lookup1",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "final",
                                    "value": {
                                        "value": "\"@{item().name}\":@{activity('Lookup1').output.value}",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "Set variable1",
                    "type": "SetVariable",
                    "dependsOn": [
                        {
                            "activity": "ForEach1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "variableName": "required",
                        "value": {
                            "value": "{@{join(variables('final'),',')}}",
                            "type": "Expression"
                        }
                    }
                },
                {
                    "name": "Copy data1",
                    "type": "Copy",
                    "dependsOn": [
                        {
                            "activity": "Set variable1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "DelimitedTextSource",
                            "additionalColumns": [
                                {
                                    "name": "required",
                                    "value": {
                                        "value": "@variables('required')",
                                        "type": "Expression"
                                    }
                                }
                            ],
                            "storeSettings": {
                                "type": "AzureBlobFSReadSettings",
                                "recursive": true,
                                "enablePartitionDiscovery": false
                            },
                            "formatSettings": {
                                "type": "DelimitedTextReadSettings"
                            }
                        },
                        "sink": {
                            "type": "DelimitedTextSink",
                            "storeSettings": {
                                "type": "AzureBlobFSWriteSettings"
                            },
                            "formatSettings": {
                                "type": "DelimitedTextWriteSettings",
                                "quoteAllText": true,
                                "fileExtension": ".txt"
                            }
                        },
                        "enableStaging": false,
                        "translator": {
                            "type": "TabularTranslator",
                            "mappings": [
                                {
                                    "source": {
                                        "name": "required",
                                        "type": "String"
                                    },
                                    "sink": {
                                        "type": "String",
                                        "physicalType": "String",
                                        "ordinal": 1
                                    }
                                }
                            ],
                            "typeConversion": true,
                            "typeConversionSettings": {
                                "allowDataTruncation": true,
                                "treatBooleanAsNumber": false
                            }
                        }
                    },
                    "inputs": [
                        {
                            "referenceName": "csv1",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "DelimitedText1",
                            "type": "DatasetReference"
                        }
                    ]
                }
            ],
            "variables": {
                "final": {
                    "type": "Array"
                },
                "required": {
                    "type": "String"
                }
            },
            "annotations": []
        }
    }
    
    • In the final copy data activity, take source as delimited text with any sample data (1 column and 1 row). create an additional column with value as the set variable.

    enter image description here

    • Now, for sink dataset create a delimited text dataset with the configurations same as shown in the below image:

    enter image description here

    • Running the pipeline would give the desired result. The following is how the data would be after the above pipeline is run.

    enter image description here

    {
       "sample1.json":[
          {
             "KTYPE":[
                {
                   "name":"john",
                   "surname":"elo"
                },
                {
                   "name":"dd",
                   "surname":"ss"
                }
             ],
             "MTYPE":[
                {
                   "name":"dsdsd",
                   "id":"elo"
                },
                {
                   "name":"sdss",
                   "id":"sds22"
                }
             ]
          }
       ],
       "sample2.json":[
          {
             "name":"Привет"
          },
          {
             "name":"Привет"
          }
       ],
       "sample3.json":[
          {
             "id":1,
             "first_name":"Catlin",
             "last_name":"Haysman",
             "email":"[email protected]",
             "gender":"Female",
             "ip_address":"80.243.124.118"
          },
          {
             "id":2,
             "first_name":"Augustin",
             "last_name":"Nesbeth",
             "email":"[email protected]",
             "gender":"Male",
             "ip_address":"250.126.164.4"
          }
       ],
       "sample4.json":[
          {
             "id":3,
             "first_name":"Layla",
             "last_name":"Morant",
             "email":"[email protected]",
             "gender":"Female",
             "ip_address":"247.73.128.196"
          },
          {
             "id":4,
             "first_name":"Ophelie",
             "last_name":"Rape",
             "email":"[email protected]",
             "gender":"Female",
             "ip_address":"148.213.192.8"
          }
       ]
    }