Search code examples
pysparkaws-glue

How to drop duplicates records that have the same value on a specific column and retain the one with the highest timestamp using pyspark


I have tried the code below. The idea of the script below is to order the records by id and timestamp and arrange it in descending order by processed_timestamp but when I try to run the query it is not arranging the record in descending order by processed timestamp. It even drops the latest record and retain the older record which should not be the case

df2 = df_concat.orderBy("id", "processed_timestamp", f.col("processed_timestamp").desc()).dropDuplicates(["id"])

I also tried the approach below but when I tried to convert it back to dataframe, the table schema is now different and the records are now residing in a single column and seperated by a comma. It also drops the latest record and retain the older record which should not be the case

    def selectRowByTimeStamp(x,y):
        if x.processed_timestamp > y.processed_timestamp:
            return x
        return y

dataMap = df_concat.rdd.map(lambda x: (x.id, x))
newdata = dataMap.reduceByKey(selectRowByTimeStamp) 

I'm not sure if I'm correctly understanding how the above code works.


Solution

  • If it was not for a simple mistake, your code would work as expected.

    You should not use the column name "processed_timestamp" twice:

    df2 = df_concat.orderBy(
        "id", f.col("processed_timestamp").desc()
    ).dropDuplicates(["id"])
    

    Your code is sorting the DataFrame by processed_timestamp in ascending order because the raw column name comes first.