I'm working with a dataset where I need to drop some columns which contain only NULL
values. The issue is that the column names are not consistent or similar, and can change with time. I was wondering if there is a way in ADF to drop a column if all instances are NULL
without having drifted columns?
I have tried unpivoting, removing rows, then re-pivoting, however after I pivot the data back to its original format, I get the following message:
"This drifted column is not in the source schema and therefore can only be referenced with pattern matching expressions"
The drifted columns don't seem to join on subsequent join functions. I have also tried setting derived columns with regex column patters to make all the drifted columns explicit, however, the byName()
function doesn't seem to work with the $$
syntax; namely:
toString(byName($$))
Any ideas of how to solve this within Azure Data Factory - Data Flows would be very much appreciated!
1
so that I can use aggregate on all other rows using this new column to group.@activity('Data flow1').output.runstatus.profile.sink1.total
In pipeline2, I have used activities to get columns that are not entirely nulls, create a dynamic schema and then use this schema as mapping and write to a file only the required columns.
First, I have read the file written at the end of dataflow without header (even though the file has header). The dataset looks as shown below:
{
"name": "pipeline2",
"properties": {
"activities": [
{
"name": "Lookup1",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"dataset": {
"referenceName": "cols",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@range(0,pipeline().parameters.count_of_rows)",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "props",
"value": {
"value": "Prop_@{item()}",
"type": "Expression"
}
}
}
]
}
},
{
"name": "ForEach2",
"type": "ForEach",
"dependsOn": [
{
"activity": "ForEach1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@variables('props')",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Append variable2",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "req_cols",
"value": {
"value": "@if(and(not(equals(activity('Lookup1').output.value[0][item()],'tp')),not(equals(activity('Lookup1').output.value[1][item()],'2'))),activity('Lookup1').output.value[0][item()],'')",
"type": "Expression"
}
}
}
]
}
},
{
"name": "Filter1",
"type": "Filter",
"dependsOn": [
{
"activity": "ForEach2",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@variables('req_cols')",
"type": "Expression"
},
"condition": {
"value": "@not(equals(item(),''))",
"type": "Expression"
}
}
},
{
"name": "ForEach3",
"type": "ForEach",
"dependsOn": [
{
"activity": "Filter1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Filter1').output.Value",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Append variable3",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "mapping",
"value": {
"value": "@json(concat('{\"source\":{\"name\":\"',item(),'\"},\"sink\":{\"name\":\"',item(),'\"}}'))",
"type": "Expression"
}
}
}
]
}
},
{
"name": "Set variable1",
"type": "SetVariable",
"dependsOn": [
{
"activity": "ForEach3",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "dynamic_schema",
"value": {
"value": "@concat('{\"type\":\"TabularTranslator\",\"mappings\":',string(variables('mapping')),'}}')",
"type": "Expression"
}
}
},
{
"name": "Copy data1",
"type": "Copy",
"dependsOn": [
{
"activity": "Set variable1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "DelimitedTextSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "DelimitedTextWriteSettings",
"quoteAllText": true,
"fileExtension": ".txt"
}
},
"enableStaging": false,
"translator": {
"value": "@json(variables('dynamic_schema'))",
"type": "Expression"
}
},
"inputs": [
{
"referenceName": "csv1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "req_file",
"type": "DatasetReference"
}
]
}
],
"parameters": {
"count_of_rows": {
"type": "int"
}
},
"variables": {
"props": {
"type": "Array"
},
"req_cols": {
"type": "Array"
},
"test": {
"type": "String"
},
"mapping": {
"type": "Array"
},
"dynamic_schema": {
"type": "String"
}
},
"annotations": []
}
}
NOTE: In the copy data activity, the source is the original file.