Search code examples
sqlapache-sparkapache-spark-sqlwindow-functions

Look Back N Months and add the them as columns spark sql aggregate


I have to look back every three months and add the previous month amount using the withcolumn.

val data = Seq(("1","201706","5"),("1","201707","10"),("2","201604","12"),("2","201601","15")).toDF("id","yyyyMM","amount")

+---+------+------+
| id|yyyyMM|amount|
+---+------+------+
|  1|201706|     5|
|  1|201707|    10|
|  2|201604|    12|
|  2|201601|    15|
+---+------+------+

The required output should be like the following. For each month we have to look back threemonth, I can do that by using the spark windowing lag function. How should we include the functionality to add the additional records

+---+---------+------+-----------+-------+-----------+-------+
| id|yearmonth|amount|yearmonth-1|amount2|yearmonth-2|amount3|
+---+---------+------+-----------+-------+-----------+-------+
|  1|   201709|     0|     201708|      0|     201707|     10|
|  1|   201708|     0|     201707|     10|     201706|      5|
|  1|   201707|    10|     201706|      5|     201705|      0|
|  1|   201706|     5|     201705|      0|     201706|      0|
|  2|   201606|     0|     201605|      0|     201604|     12|
|  2|   201605|     0|     201604|     12|     201603|      0|
|  2|   201604|    12|     201603|      0|     201602|      0|
|  2|   201603|     0|     201602|      0|     201601|     15|
|  2|   201602|     0|     201601|     15|     201512|      0|
|  2|   201601|    15|     201512|      0|     201511|      0|
+---+---------+------+-----------+-------+-----------+-------+

I mean the first record in the table is like look forward. Like adding couple of more months. taking about following records.

+---+---------+------+-----------+-------+-----------+-------+
| id|yearmonth|amount|yearmonth-1|amount2|yearmonth-2|amount3|
+---+---------+------+-----------+-------+-----------+-------+
|  1|   201709|     0|     201708|      0|     201707|     10|
|  1|   201708|     0|     201707|     10|     201706|      5|

Solution

  • I wouldn't know if there is a better way, but you need to create te records somewhere. Lag does not do that. So first you need to generate new records based on the current ones. Then you could use the lag function.

    Maybe something like this:

    data
      // convert the string to an actual date
      .withColumn("yearmonth", to_date('yyyyMM, "yyyyMM"))
      // for each record create 2 additional in the future (with 0 amount)
      .select(
      explode(array(
        // org record
        struct('id, date_format('yearmonth, "yyyyMM").as("yearmonth"), 'amount),
        // 1 month in future
        struct('id, date_format(add_months('yearmonth, 1), "yyyyMM").as("yearmonth"), lit(0).as("amount")),
        // 2 months in future
        struct('id, date_format(add_months('yearmonth, 2), "yyyyMM").as("yearmonth"), lit(0).as("amount"))
      )).as("record"))
      // keep 1 record per month
      .groupBy($"record.yearmonth")
      .agg(
        min($"record.id").as("id"),
        sum($"record.amount").as("amount")
      )
      // final structure (with lag fields)
      .select(
        'id,
        'yearmonth,
        'amount,
         lag('yearmonth, 1).over(orderByWindow).as("yearmonth-1"),
         lag('amount, 1, 0).over(orderByWindow).as("amount2"),
         lag('yearmonth, 2).over(orderByWindow).as("yearmonth-2"),
         lag('amount, 2, 0).over(orderByWindow).as("amount3")
      )
      .orderBy('yearmonth.desc)
    

    It's not perfect, but it's a start

    +---+---------+------+-----------+-------+-----------+-------+
    |id |yearmonth|amount|yearmonth-1|amount2|yearmonth-2|amount3|
    +---+---------+------+-----------+-------+-----------+-------+
    |1  |201709   |0.0   |201708     |0.0    |201707     |10.0   |
    |1  |201708   |0.0   |201707     |10.0   |201706     |5.0    |
    |1  |201707   |10.0  |201706     |5.0    |201606     |0.0    |
    |1  |201706   |5.0   |201606     |0.0    |201605     |0.0    |
    |2  |201606   |0.0   |201605     |0.0    |201604     |12.0   |
    |2  |201605   |0.0   |201604     |12.0   |201603     |0.0    |
    |2  |201604   |12.0  |201603     |0.0    |201602     |0.0    |
    |2  |201603   |0.0   |201602     |0.0    |201601     |15.0   |
    |2  |201602   |0.0   |201601     |15.0   |null       |0.0    |
    |2  |201601   |15.0  |null       |0.0    |null       |0.0    |
    +---+---------+------+-----------+-------+-----------+-------+