Search code examples
pythondataframepysparkapache-spark-sql

How to assign a monotonically increasing number as a suffix to only duplicates in a column?


Input: pyspark dataframe firstname column has duplicates

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|James    |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Jen      |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+

Expected Output: I want to assign a suffix to only the duplicates in the column to make all unique.

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James1   |          |Smith   |36636|M     |3000  |
|James2   |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Jen1     |Anne      |Jones   |39192|F     |4000  |
|Jen2     |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+

This is a working solution:

from pyspark.sql import Window
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, lit, col, when

window_spec = Window.partitionBy("firstname").orderBy("firstname")
df2 = df.withColumn("occurence", f.row_number().over(window_spec))

@udf(StringType())
def set_occurence(firstname, occurence): return firstname + str(occurence) 

df2 = df2.withColumn('firstname', set_occurence('firstname', 'occurence')).drop('occurence')
df2.show()

This produces the output:

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|   James1|      Rose|   Smith|36636|     M|  3000|
|   James2|      Rose|   Karim|40288|     M|  4000|
|     Jen1|      Anne|   Jones|39192|     F|  4000|
|     Jen2|      Anne|   Brown|     |     F|    -1|
|  Robert1|    Taylor|Williams|42114|     M|  4000|
+---------+----------+--------+-----+------+------+

However, I feel I have to repartition the df. This might be costly since my actual usecase has a large dataframe.

Any other ideas?


Solution

  • Don't use UDF's. They are inefficient and should be avoided whenever you can. In this case the solution can be achieved using native spark-sql functions.

    W = Window.partitionBy('firstname').orderBy('firstname')
    row_num = F.row_number().over(W)
    has_dupes = F.count(F.lit(1)).over(W) > 1
    firstname = F.when(has_dupes, F.concat('firstname', row_num)).otherwise(F.col('firstname'))
    
    df1 = df.withColumn('firstname', firstname)
    

    df1.show()
    +---------+----------+--------+-------+------+------+
    |firstname|middlename|lastname|     id|gender|salary|
    +---------+----------+--------+-------+------+------+
    |   James1|          |   Smith|36636  |     M|  3000|
    |   James2|      Rose|        |40288  |     M|  4000|
    |     Jen1|      Anne|   Jones|39192  |     F|  4000|
    |     Jen2|      Mary|   Brown|       |     F|    -1|
    |   Robert|          |Williams|42114  |     M|  4000|
    +---------+----------+--------+-------+------+------+