Search code examples
scalaapache-sparkwindowing

How to find the next occurring item from current row in a data frame using Spark Windowing?


I have the following Dataframe:

+------+----------+-------------+--------------------+---------+-----+----------+
|ID    |MEM_ID    | BFS         | SVC_DT             |TYP      |SEQ  |BFS_SEQ   |
+------+----------+----------------------------------+---------+-----+----------+
|105771|29378668  | BRIMONIDINE | 2019-02-04 00:00:00|PD       |1    |1         |
|105772|29378668  | BRIMONIDINE | 2019-04-04 00:00:00|PD       |2    |2         |
|105773|29378668  | BRIMONIDINE | 2019-04-17 00:00:00|RV       |3    |3         |
|105774|29378668  | TIMOLOL     | 2019-04-17 00:00:00|RV       |4    |1         |
|105775|29378668  | BRIMONIDINE | 2019-04-22 00:00:00|PD       |5    |4         |
|105776|29378668  | TIMOLOL     | 2019-04-22 00:00:00|PD       |6    |2         |
+------+----------+----------------------------------+---------+-----+----------+

For every row, I have to find the occurrence of next 'PD' Typ at BFS level from the current row and populate its associated ID as a new column named 'NEXT_PD_TYP_ID'

The output I am expecting is:

+------+---------+-------------+--------------------+----+-----+---------+---------------+
|ID    |MEM_ID   | BFS         | SVC_DT             |TYP |SEQ  |BFS_SEQ  |NEXT_PD_TYP_ID |
+------+---------+----------------------------------+----+-----+---------+---------------+
|105771|29378668 | BRIMONIDINE | 2019-02-04 00:00:00|PD  |1    |1        |105772         |
|105772|29378668 | BRIMONIDINE | 2019-04-04 00:00:00|PD  |2    |2        |105775         | 
|105773|29378668 | BRIMONIDINE | 2019-04-17 00:00:00|RV  |3    |3        |105775         |
|105774|29378668 | TIMOLOL     | 2019-04-17 00:00:00|RV  |4    |1        |105776         |
|105775|29378668 | BRIMONIDINE | 2019-04-22 00:00:00|PD  |5    |4        |null           | 
|105776|29378668 | TIMOLOL     | 2019-04-22 00:00:00|PD  |6    |2        |null           |
+------+---------+----------------------------------+----+-----+---------+---------------+

Need help.

I have tried using the conditional aggregation: max(when), however since it has more than one 'PD' the max is returning only one value for all the rows.

No error messages


Solution

  • I hope this helps. I created a new column with ID's of TYP === PD. I called it TYPPDID. Then I used Window frame ranging from next row to unbounded following row and got the first not-null TYPPDID orderBy("ID") in the end is only to show records in order.

    import org.apache.spark.sql.functions._
    
    val df = Seq(
    ("105771", "BRIMONIDINE", "PD"),
    ("105772", "BRIMONIDINE", "PD"),
    ("105773", "BRIMONIDINE","RV"),
    ("105774", "TIMOLOL", "RV"),
    ("105775", "BRIMONIDINE", "PD"),
    ("105776", "TIMOLOL", "PD")
    ).toDF("ID", "BFS", "TYP").withColumn("TYPPDID", when($"TYP" === "PD", $"ID"))
    df: org.apache.spark.sql.DataFrame = [ID: string, BFS: string ... 2 more fields]
    
    scala> df.show
    +------+-----------+---+-------+
    |    ID|        BFS|TYP|TYPPDID|
    +------+-----------+---+-------+
    |105771|BRIMONIDINE| PD| 105771|
    |105772|BRIMONIDINE| PD| 105772|
    |105773|BRIMONIDINE| RV|   null|
    |105774|    TIMOLOL| RV|   null|
    |105775|BRIMONIDINE| PD| 105775|
    |105776|    TIMOLOL| PD| 105776|
    +------+-----------+---+-------+
    
    
    scala> val overColumns = Window.partitionBy("BFS").orderBy("ID").rowsBetween(1, Window.unboundedFollowing)
    overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@eb923ef
    
    
    scala> df.withColumn("NEXT_PD_TYP_ID",first("TYPPDID", true).over(overColumns)).orderBy("ID").show(false)
    +------+-----------+---+-------+-------+
    |ID    |BFS        |TYP|TYPPDID|NEXT_PD_TYP_ID|
    +------+-----------+---+-------+-------+
    |105771|BRIMONIDINE|PD |105771 |105772 |
    |105772|BRIMONIDINE|PD |105772 |105775 |
    |105773|BRIMONIDINE|RV |null   |105775 |
    |105774|TIMOLOL    |RV |null   |105776 |
    |105775|BRIMONIDINE|PD |105775 |null   |
    |105776|TIMOLOL    |PD |105776 |null   |
    +------+-----------+---+-------+-------+