I have tried the code below. The idea of the script below is to order the records by id and timestamp and arrange it in descending order by processed_timestamp but when I try to run the query it is not arranging the record in descending order by processed timestamp. It even drops the latest record and retain the older record which should not be the case
df2 = df_concat.orderBy("id", "processed_timestamp", f.col("processed_timestamp").desc()).dropDuplicates(["id"])
I also tried the approach below but when I tried to convert it back to dataframe, the table schema is now different and the records are now residing in a single column and seperated by a comma. It also drops the latest record and retain the older record which should not be the case
def selectRowByTimeStamp(x,y):
if x.processed_timestamp > y.processed_timestamp:
return x
return y
dataMap = df_concat.rdd.map(lambda x: (x.id, x))
newdata = dataMap.reduceByKey(selectRowByTimeStamp)
I'm not sure if I'm correctly understanding how the above code works.
If it was not for a simple mistake, your code would work as expected.
You should not use the column name "processed_timestamp"
twice:
df2 = df_concat.orderBy(
"id", f.col("processed_timestamp").desc()
).dropDuplicates(["id"])
Your code is sorting the DataFrame by processed_timestamp
in ascending order because the raw column name comes first.