Search code examples
azureazure-data-factory

Conditional sorting in Azure Data Factory


lets assume I have 3 columns (id, language, name). I have multiple rows with the same ID but different language and name values.

eg.:

id, language, name
123, German,  Blume
123, English, Flower
123, Spanish, Flora
456, Netherlands, Hoos
456, Spanish, casa
456, English, House

Now I need a flowlet that always checks if a German version for an ID is available. If yes, go for the German one. Deprecate all other rows with that distinct ID and only leave one distinct ID row with the German version. In case there is no German search for English and leave the rest. And so on.

This is my order:

when language == German, first rank
when language == English, second rank
when language == French, third rank
...

So the final list after the dataflow would be only the distinct ids with the respective name.

123, Blume
456, House 

Solution

  • You can follow the below approach to achieve your requirement.

    First create an array parameter cond_arr in the dataflow with your order values ["German","English","French"].

    After source, take a filter activity and filter out the rows which have column language value in the above array.

    enter image description here

    This will filter the required values. Then, take a derived Column transformation and create a new column cond_arr with below expression.

    mapIndex($cond_arr,concat(#item,'_',toString(#index)))
    

    enter image description here

    This will add the index to each item and builds the same array in each row.

    After that, take another derived Column transformation and create a column rank with below expression.

    toInteger(split(find(cond_arr,split(#item,'_')[1]==language),'_')[2])
    

    This will compare each row in langauge column with the array and specifies the rank of it.

    enter image description here

    Next, take an Aggregate transformation and group by the id column. In the Aggregate section, create two columns collect_arr and min_rank like below.

    enter image description here

    This will create array of columns name and rank1. Another column will have a minimum rank for each id.

    enter image description here

    Now, take another derived Column transformation and use below expression with a new column res_name. This will give the name column values by filtering out required rank values.

    split(split(toString(mapIf(collect_arr, startsWith(split(toString(#item),'rank1\":')[2],toString(min_rank)),#item)[1]),'name":"')[2],'"')[1]
    

    enter image description here

    After this, Remove the extra columns using select transformation to get the desired result.

    My Dataflow JSON for your reference:

    {
        "name": "dataflow2",
        "properties": {
            "type": "MappingDataFlow",
            "typeProperties": {
                "sources": [
                    {
                        "dataset": {
                            "referenceName": "source_csv",
                            "type": "DatasetReference"
                        },
                        "name": "source1"
                    }
                ],
                "sinks": [
                    {
                        "dataset": {
                            "referenceName": "DelimitedText1",
                            "type": "DatasetReference"
                        },
                        "name": "sink1"
                    }
                ],
                "transformations": [
                    {
                        "name": "filter1"
                    },
                    {
                        "name": "derivedColumn1"
                    },
                    {
                        "name": "derivedColumn2"
                    },
                    {
                        "name": "aggregate1"
                    },
                    {
                        "name": "derivedColumn3"
                    },
                    {
                        "name": "select1"
                    }
                ],
                "scriptLines": [
                    "parameters{",
                    "     cond_arr as string[] ([\"German\",\"English\",\"French\"])",
                    "}",
                    "source(output(",
                    "          id as string,",
                    "          language as string,",
                    "          name as string",
                    "     ),",
                    "     allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     ignoreNoFilesFound: false) ~> source1",
                    "source1 filter(contains($cond_arr,#item==language)) ~> filter1",
                    "filter1 derive(cond_arr = mapIndex($cond_arr,concat(#item,'_',toString(#index)))) ~> derivedColumn1",
                    "derivedColumn1 derive(rank1 = toInteger(split(find(cond_arr,split(#item,'_')[1]==language),'_')[2])) ~> derivedColumn2",
                    "derivedColumn2 aggregate(groupBy(id),",
                    "     collect_arr = collect(@(name,rank1)),",
                    "          min_rank = min(rank1)) ~> aggregate1",
                    "aggregate1 derive(res_name = split(split(toString(mapIf(collect_arr, startsWith(split(toString(#item),'rank1\\\":')[2],toString(min_rank)),#item)[1]),'name\":\"')[2],'\"')[1]) ~> derivedColumn3",
                    "derivedColumn3 select(mapColumn(",
                    "          id,",
                    "          res_name",
                    "     ),",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true) ~> select1",
                    "select1 sink(allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     umask: 0022,",
                    "     preCommands: [],",
                    "     postCommands: [],",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true) ~> sink1"
                ]
            }
        }
    }