Search code examples
sqlapache-sparkpysparkapache-spark-sql

Spark : need confirmation on approach in capturing first and last date : on dataset


I have a data frame :

A, B, C, D, 201701, 2020001
A, B, C, D, 201801, 2020002
A, B, C, D, 201901, 2020003

expected output :

col_A, col_B, col_C ,col_D, min_week ,max_week, min_month, max_month
A,         B,     C,     D,    201701,  201901,  2020001,  2020003

What I tried in pyspark-

from pyspark.sql import Window
import pyspark.sql.functions as psf

w1 = Window.partitionBy('A','B', 'C', 'D')\
.orderBy('WEEK','MONTH')
df_new = df_source\
.withColumn("min_week", psf.first("WEEK").over(w1))\
.withColumn("max_week", psf.last("WEEK").over(w1))\
.withColumn("min_month", psf.first("MONTH").over(w1))\
.withColumn("max_month", psf.last("MONTH").over(w1))

What i also tried -

sql_1 = """
select A, B , C, D, first(WEEK) as min_week, 
last(WEEK) as max_week , first(MONTH) as min_month, 
last(MONTH) as max_month from df_source
group by A, B , C, D
order by A, B , C, D
"""
df_new = spark.sql(sql_1)

Using the first and second approach I got non consistent results. Will the below approach work to fix the issue encountered above?

sql_1 = """
select A, B , C, D, min(WEEK) as min_week, 
max(WEEK) as max_week , min(MONTH) as min_month, 
max(MONTH) as max_month from df_source
group by A, B , C, D
order by A, B , C, D
"""
df_new = spark.sql(sql_1)

Which approach works perfect in pyspark every time? Is there any alternate way?


Solution

  • The third approach you propose will work every time. You could also write it like this:

    df
        .groupBy('A', 'B', 'C', 'D')
        .agg(F.min('WEEK').alias('min_week'), F.max('WEEK').alias('max_week'),
             F.min('MONTH').alias('min_month'), F.max('MONTH').alias('max_month'))
        .show()
    

    which yields:

    +---+---+---+---+--------+--------+---------+---------+
    |  A|  B|  C|  D|min_week|max_week|min_month|max_month|
    +---+---+---+---+--------+--------+---------+---------+
    |  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
    +---+---+---+---+--------+--------+---------+---------+
    

    It is interesting to understand why the first two approaches produce unpredictable results while the third always works.

    The second approach is unpredictable because spark is a parallel computation engine. When it aggregates a value, it starts by aggregating the value in all the partitions and then the results will be aggregated two by two. Yet the order of these aggregations is not deterministic. It depends among other things on the order of completion of the tasks which can change at every attempt, in particular if there is a lot of data.

    The first approach is not exactly what you want to do. Window functions will not aggregate the dataframe into one single row. They will compute the aggregation and add it to every row. You are also making several mistakes. If you order the dataframe, by default spark considers windows ranging from the start of the window to the current row. Therefore the maximum will be the current row for the week. In fact, to compute the in and the max, you do not need to order the dataframe. You can just do it like this:

    w = Window.partitionBy('A','B', 'C', 'D')
    df.select('A', 'B', 'C', 'D',
        F.min('WEEK').over(w).alias('min_week'),
        F.max('WEEK').over(w).alias('max_week'),
        F.min('MONTH').over(w).alias('min_month'),
        F.max('MONTH').over(w).alias('max_month')
    ).show()
    

    which yields the correct result but that was not what you were expecting. But at least, you see the difference between window aggregations and regular aggregations.

    +---+---+---+---+--------+--------+---------+---------+
    |  A|  B|  C|  D|min_week|max_week|min_month|max_month|
    +---+---+---+---+--------+--------+---------+---------+
    |  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
    |  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
    |  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
    +---+---+---+---+--------+--------+---------+---------+