Search code examples
azureazure-data-factory

Dropping columns in Azure Data Factory based on values in columns


I'm working with a dataset where I need to drop some columns which contain only NULL values. The issue is that the column names are not consistent or similar, and can change with time. I was wondering if there is a way in ADF to drop a column if all instances are NULL without having drifted columns?

I have tried unpivoting, removing rows, then re-pivoting, however after I pivot the data back to its original format, I get the following message:

"This drifted column is not in the source schema and therefore can only be referenced with pattern matching expressions"

The drifted columns don't seem to join on subsequent join functions. I have also tried setting derived columns with regex column patters to make all the drifted columns explicit, however, the byName() function doesn't seem to work with the $$ syntax; namely:

toString(byName($$))

Any ideas of how to solve this within Azure Data Factory - Data Flows would be very much appreciated!


Solution

    • I have used combination of both data factory pipeline activities and dataflow to achieve the requirement.
    • First, I have taken dataflow to output a file. I have added a new column with all values as 1 so that I can use aggregate on all other rows using this new column to group.

    enter image description here

    • I have used collect() to create an array for each of the column where group by is on above created column.

    enter image description here

    • Now create another derived column to replace the array by converting array to string and calculating length. If length is 2 it indicates that column contains all nulls.

    enter image description here

    • Write this dataflow output to a file. The data preview of the sink will be as follows:

    enter image description here

    • Create a dataflow activity to run the above dataflow and pass the following dynamic content in execute pipeline activity to filter out and write data of only required columns.
    @activity('Data flow1').output.runstatus.profile.sink1.total
    

    enter image description here

    • In pipeline2, I have used activities to get columns that are not entirely nulls, create a dynamic schema and then use this schema as mapping and write to a file only the required columns.

    • First, I have read the file written at the end of dataflow without header (even though the file has header). The dataset looks as shown below:

    enter image description here

    • You can directly use the following pipeline JSON to build the pipeline:
    {
        "name": "pipeline2",
        "properties": {
            "activities": [
                {
                    "name": "Lookup1",
                    "type": "Lookup",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "DelimitedTextSource",
                            "storeSettings": {
                                "type": "AzureBlobFSReadSettings",
                                "recursive": true,
                                "enablePartitionDiscovery": false
                            },
                            "formatSettings": {
                                "type": "DelimitedTextReadSettings"
                            }
                        },
                        "dataset": {
                            "referenceName": "cols",
                            "type": "DatasetReference"
                        },
                        "firstRowOnly": false
                    }
                },
                {
                    "name": "ForEach1",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Lookup1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@range(0,pipeline().parameters.count_of_rows)",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Append variable1",
                                "type": "AppendVariable",
                                "dependsOn": [],
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "props",
                                    "value": {
                                        "value": "Prop_@{item()}",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "ForEach2",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "ForEach1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@variables('props')",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Append variable2",
                                "type": "AppendVariable",
                                "dependsOn": [],
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "req_cols",
                                    "value": {
                                        "value": "@if(and(not(equals(activity('Lookup1').output.value[0][item()],'tp')),not(equals(activity('Lookup1').output.value[1][item()],'2'))),activity('Lookup1').output.value[0][item()],'')",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "Filter1",
                    "type": "Filter",
                    "dependsOn": [
                        {
                            "activity": "ForEach2",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@variables('req_cols')",
                            "type": "Expression"
                        },
                        "condition": {
                            "value": "@not(equals(item(),''))",
                            "type": "Expression"
                        }
                    }
                },
                {
                    "name": "ForEach3",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Filter1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('Filter1').output.Value",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Append variable3",
                                "type": "AppendVariable",
                                "dependsOn": [],
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "mapping",
                                    "value": {
                                        "value": "@json(concat('{\"source\":{\"name\":\"',item(),'\"},\"sink\":{\"name\":\"',item(),'\"}}'))",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "Set variable1",
                    "type": "SetVariable",
                    "dependsOn": [
                        {
                            "activity": "ForEach3",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "variableName": "dynamic_schema",
                        "value": {
                            "value": "@concat('{\"type\":\"TabularTranslator\",\"mappings\":',string(variables('mapping')),'}}')",
                            "type": "Expression"
                        }
                    }
                },
                {
                    "name": "Copy data1",
                    "type": "Copy",
                    "dependsOn": [
                        {
                            "activity": "Set variable1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "DelimitedTextSource",
                            "storeSettings": {
                                "type": "AzureBlobFSReadSettings",
                                "recursive": true,
                                "enablePartitionDiscovery": false
                            },
                            "formatSettings": {
                                "type": "DelimitedTextReadSettings"
                            }
                        },
                        "sink": {
                            "type": "DelimitedTextSink",
                            "storeSettings": {
                                "type": "AzureBlobFSWriteSettings"
                            },
                            "formatSettings": {
                                "type": "DelimitedTextWriteSettings",
                                "quoteAllText": true,
                                "fileExtension": ".txt"
                            }
                        },
                        "enableStaging": false,
                        "translator": {
                            "value": "@json(variables('dynamic_schema'))",
                            "type": "Expression"
                        }
                    },
                    "inputs": [
                        {
                            "referenceName": "csv1",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "req_file",
                            "type": "DatasetReference"
                        }
                    ]
                }
            ],
            "parameters": {
                "count_of_rows": {
                    "type": "int"
                }
            },
            "variables": {
                "props": {
                    "type": "Array"
                },
                "req_cols": {
                    "type": "Array"
                },
                "test": {
                    "type": "String"
                },
                "mapping": {
                    "type": "Array"
                },
                "dynamic_schema": {
                    "type": "String"
                }
            },
            "annotations": []
        }
    }
    

    enter image description here

    NOTE: In the copy data activity, the source is the original file.