Search code examples
pythonapache-sparkjoinpyspark

How to use window function in pyspark dataframe


I have a pyspark dataframe as below:

Mail            sno     mail_date       date1       present
[email protected]     790     2024-01-01      2024-02-06  yes
[email protected]     790     2023-12-23      2023-01-01  
[email protected]     101     2022-02-23                  
[email protected]     101     2021-01-20      2022-07-09  yes

In the final dataframe, I need one record of sno with all the max date values and corresponding max date in mail_date column for present So the final dataframe should be like:

Mail            sno     mail_date       date1       present
[email protected]     790     2024-01-01      2024-02-06  yes
[email protected]     101     2022-02-23      2022-07-09      

I have the following code,

windowSpec=Window.partitionBy('Mail','sno')

df= df.withColumn('max_mail_date', F.max('mail_date').over(windowSpec))\
       .withColumn('max_date1', F.max('date1').over(windowSpec))
       
df1 = df.withColumn('mail_date', F.when(F.col('mail_date').isNotNull(), F.col('max_mail_date')).otherwise(F.col('mail_date')))\
        .drop('max_mail_date').dropDuplicates()

Here, Im not getting the expected values in the present column. Please suggest any changes


Solution

  • Even thought @DerekO's answer seems correct. I'd like to take another approach on this.

    I prefer using the groupBy().agg() approach with max and struct. This approach is more efficient in terms of data reduction specially when working with large datasets.

    So considering the data that you have provided :

    df_result = df.withColumn(
        "mail_date_struct", 
        F.struct(F.col("mail_date").alias("max_mail_date"), "present")
    ).groupBy("Mail", "sno").agg(
        F.max("mail_date_struct").alias("max_mail_date_struct"),
        F.max("date1").alias("max_date1")
    ).select(
        "Mail", 
        "sno",
        "max_mail_date_struct.max_mail_date",
        "max_mail_date_struct.present",
        "max_date1"
    )
    
    df_result.show()
    # +-----------+---+-------------+-------+----------+
    # |       Mail|sno|max_mail_date|present| max_date1|
    # +-----------+---+-------------+-------+----------+
    # |[email protected]|790|   2024-01-01|    yes|2024-02-06|
    # |[email protected]|101|   2022-02-23|   NULL|2022-07-09|
    # +-----------+---+-------------+-------+----------+
    

    Considerations :

    The choice between the approach that I'm suggesting and the window functions approach depends on your specific use case, dataset characteristics and performance considerations.

    For large datasets where you aim to reduce data volume through aggregation might be more efficient. However, if maintaining the original dataset's structure while computing partition wise metrics is required, window function could be more appropriate although potential at a higher computational cost.

    Always consider testing both approaches on a subset of your data to understand the performance implications in your specific environment.