Search code examples
pythondataframejoinpysparkapache-spark-sql

Pyspark window function to generate rank on data based on sequence of the value of a column


I have a dataframe df as :

df = 
A   B   type
X   11  typeA
X   12  typeA
X   13  typeB
X   14  typeB
X   15  typeC
X   16  typeC
Y   17  typeA
Y   18  typeA
Y   19  typeB
Y   20  typeB
Y   21  typeC
Y   22  typeC

Now I want to create a new column rank where based on the window partition of column 'A' and order by column 'type'.

But I want a specific sequence, typeA - typeB - typeC - typeA - typeB - typeC----. The result dataframe I am looking for is:

result = 
A   B   type    rank
X   11  typeA   1
X   13  typeB   2
X   15  typeC   3
X   12  typeA   4
X   14  typeB   5
X   16  typeC   6
Y   17  typeA   1
Y   19  typeB   2
Y   21  typeC   3
Y   18  typeA   4
Y   20  typeB   5
Y   22  typeC   6

What I am doing right now:

from pyspark.sql import Window
result = (df
        .withColumn("rank", row_number()
        .over(Window.partitionBy('A')
        .orderBy(col('type').desc())
        )
        )
        ).persist()

Which comes not quite the sequence I am looking for.

A   B   type    rank
X   11  typeA   1
X   12  typeA   2
X   13  typeB   3
X   14  typeB   4
X   15  typeC   5
X   16  typeC   6
Y   17  typeA   1
Y   18  typeA   2
Y   19  typeB   3
Y   20  typeB   4
Y   21  typeC   5
Y   22  typeC   6

Any idea on how to generate the sequence when defining the window function?


Solution

  • You can try something like this with inner window:

    from pyspark.sql import Window
    import pyspark.sql.functions as F
    
    simpleData = [
        ("X", 11, "typeA"),
        ("X", 12, "typeA"),
        ("X", 13, "typeB"),
        ("X", 14, "typeB"),
        ("X", 15, "typeC"),
        ("X", 16, "typeC"),
        ("Y", 17, "typeA"),
        ("Y", 18, "typeA"),
        ("Y", 19, "typeB"),
        ("Y", 20, "typeB"),
        ("Y", 21, "typeC"),
        ("Y", 22, "typeC"),
    ]
    
    schema = ["A", "B", "type"]
    df = spark.createDataFrame(data=simpleData, schema=schema)
    
    w1 = Window.partitionBy("A", "type").orderBy("B")
    
    dfWithInnerRank = df.withColumn("innerRank", F.sum(F.lit(1)).over(w1))
    
    w2 = Window.partitionBy("A").orderBy("innerRank", "B")
    
    dfWithInnerRank.withColumn("rank", F.row_number().over(w2)).drop("innerRank").show()
    

    Output:

    +---+---+-----+----+
    |  A|  B| type|rank|
    +---+---+-----+----+
    |  X| 11|typeA|   1|
    |  X| 13|typeB|   2|
    |  X| 15|typeC|   3|
    |  X| 12|typeA|   4|
    |  X| 14|typeB|   5|
    |  X| 16|typeC|   6|
    |  Y| 17|typeA|   1|
    |  Y| 19|typeB|   2|
    |  Y| 21|typeC|   3|
    |  Y| 18|typeA|   4|
    |  Y| 20|typeB|   5|
    |  Y| 22|typeC|   6|
    +---+---+-----+----+
    

    If your are interested intermediate results with innerRank looks like this:

    +---+---+-----+---------+----+
    |  A|  B| type|innerRank|rank|
    +---+---+-----+---------+----+
    |  X| 11|typeA|        1|   1|
    |  X| 13|typeB|        1|   2|
    |  X| 15|typeC|        1|   3|
    |  X| 12|typeA|        2|   4|
    |  X| 14|typeB|        2|   5|
    |  X| 16|typeC|        2|   6|
    |  Y| 17|typeA|        1|   1|
    |  Y| 19|typeB|        1|   2|
    |  Y| 21|typeC|        1|   3|
    |  Y| 18|typeA|        2|   4|
    |  Y| 20|typeB|        2|   5|
    |  Y| 22|typeC|        2|   6|
    +---+---+-----+---------+----+