Search code examples
apache-sparkpysparkhive

How to remove duplicate entries in a dataframe in spark?



The following is the table I have.

|    Path    | Name | Value 1 | Value 2 |  E-mail  |
|:----------:|:----:|:-------:|:-------:|:--------:|
| /project/a | a    |  10000  |  8555   | [email protected]|
| /project/b | b    |  4000   |  7549   | [email protected]|
| /project/c | c    |  4571   |  4478   | [email protected]|
| /project/a | a    | -1.0    |  0.0    | [email protected]|

Schema:
Path - string
Name - string
Value 1 - float
Value 2 - float
E-mail - string


Now I want to filter out the last repeating name row (a is repeating again) completely while sending the report to the concerned person in the e-mail column.


I have tried using the condition statements (if statement) but it is not working. What should I do?

if (project["Name"] == 'a' and project["Value 1"] == '-1.0'):
    continue
else:
    mail_notification(sender,receiver,subject)

Solution

  • If you want to remove duplicates from a dataframe based on a column (or set of columns) you can use dropDuplicates. Note that it may not drop the second one, ordering is not deterministic and this should be used when the rest of the fields are either the same or irrelevant. eg.:

    deduplicated = df.dropDuplicates(['Name'])
    

    If you want ordering for deduplication preference, you can use window functions (row_number, Window). For example, choosing the row for the highest "Value 1" in the duplicate group:

    import pyspark.sql.functions as f
    from pyspark.sql import Window
    
    window = Window().partitionBy('Name').orderBy(f.desc('Value 1'))
    deduplicated = df.withColumn('row_number', f.row_number().over(window))\
        .filter(f.col('row_number') == 1)\
        .drop('row_number')
    

    If you just want to drop negative values as they are not valid data, the easiest is to filter:

    import pyspark.sql.functions as f
    filtered = df.filter(f.col('Value 1') >= 0)