Search code examples
pythonpython-3.xpysparkrdd

Add unique identifier (Serial No.) for consecutive column values in pyspark


I created a rdd using

import pyspark.sql.functions as F
from pyspark.sql import Window
df = pd.DataFrame({"b": ['A','A','A','A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],"Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23],"a": [3,-4,2, -1, -3, 1,-7,-6, -4, -5, -1, 1,4,5,-3,2,-5,4, -4,-2,5,-5,-4]})

df2=spark.createDataFrame(df)

df2 = df2.withColumn("pos_neg",col("a") < 0)
df2 = df2.withColumn("huyguyg",concat(col("b"), lit(" "), col("pos_neg")))

+---+---+---+-------+---+-------+
|  b|Sno|  a|pos_neg|val|huyguyg|
+---+---+---+-------+---+-------+
|  B|  8| -6|   true|  1| B true|
|  B|  9| -4|   true|  1| B true|
|  B| 10| -5|   true|  1| B true|
|  D| 13|  4|  false|  0|D false|
|  D| 14|  5|  false|  0|D false|
|  D| 15| -3|   true|  1| D true|
|  D| 16|  2|  false|  1|D false|
|  D| 17| -5|   true|  2| D true|
|  D| 18|  4|  false|  2|D false|
|  D| 19| -4|   true|  3| D true|
|  D| 20| -2|   true|  3| D true|
|  D| 21|  5|  false|  3|D false|
|  D| 22| -5|   true|  4| D true|
|  D| 23| -4|   true|  4| D true|
|  C| 11| -1|   true|  1| C true|
|  C| 12|  1|  false|  1|C false|
|  A|  1|  3|  false|  0|A false|
|  A|  2| -4|   true|  1| A true|
|  A|  3|  2|  false|  1|A false|
|  A|  4| -1|   true|  2| A true|
+---+---+---+-------+---+-------+

I want an additional column in the end which adds a unique identifier (serial no.) for consecutive values, for instance starting value in column 'huyguyg' is 'B true' it can get a number say '1' and since next 2 values are also 'B true' they also get number '1', subsequently the serial number increases and remains constant for same 'huyguyg' value

Any support in this regard will be helpful. A lag function in this regard may be helpful, but I am not able to sum the number

df2 = df2.withColumn("serial no.",(df2.pos_neg != F.lag('pos_neg').over(w)).cast('int'))

Solution

  • Simple! just use Dense Rank function with orderBy clause. Here is how it looks like:

    import dense_rank
    
    df3=df2.withColumn("denseRank",dense_rank().over(Window.orderBy(df2.huyguyg)))
    
    +---+---+---+-------+-------+---------+
    |Sno|  a|  b|pos_neg|huyguyg|denseRank|
    +---+---+---+-------+-------+---------+
    |  1|  3|  A|  false|A false|        1|
    |  3|  2|  A|  false|A false|        1|
    |  6|  1|  A|  false|A false|        1|
    |  2| -4|  A|   true| A true|        2|
    |  4| -1|  A|   true| A true|        2|
    |  5| -3|  A|   true| A true|        2|
    |  7| -7|  A|   true| A true|        2|
    |  8| -6|  B|   true| B true|        3|
    |  9| -4|  B|   true| B true|        3|
    | 10| -5|  B|   true| B true|        3|
    | 12|  1|  C|  false|C false|        4|
    | 11| -1|  C|   true| C true|        5|
    | 13|  4|  D|  false|D false|        6|
    | 14|  5|  D|  false|D false|        6|
    | 16|  2|  D|  false|D false|        6|
    | 18|  4|  D|  false|D false|        6|
    | 21|  5|  D|  false|D false|        6|
    | 15| -3|  D|   true| D true|        7|
    | 17| -5|  D|   true| D true|        7|
    | 19| -4|  D|   true| D true|        7|
    +---+---+---+-------+-------+---------+
    only showing top 20 rows