Search code examples
dataframeapache-sparkpysparkapache-spark-sql

Tricky harmonizing of ID columns across rows in Spark Dataframe


I have a set of rows, where each event row is uniquely identified by "EventId". A set of events belong to a group, identified by "GUID" and "WFID". Problem is, most of the events do not get both the IDs together in the same event.

An example is below. Only "WF3" has both "GUID" and "WFID". From this, the IDs need to be harmonized across other candidate events (WF1 to WF6):

val df= Seq(
("GUID1",   "",      "WF1", "01-01-2023"),
("GUID1",   "",      "WF2", "01-02-2023"),
("GUID1",   "WFID1", "WF3", "01-03-2023"),
("GUID1",   "",      "WF4", "01-04-2023"),
(""       , "WFID1", "WF5", "01-05-2023"),
("GUID1",   "",      "WF6", "01-06-2023"),
("GUID2",   "",      "WF7", "01-07-2023"),
("",        "WFID2", "WF8", "01-08-2023")
).toDF("GUID", "WFID", "EventId", "Time")
df.show

+-----+-----+-------+----------+
| GUID| WFID|EventId|      Time|
+-----+-----+-------+----------+
|GUID1|     |    WF1|01-01-2023|
|GUID1|     |    WF2|01-02-2023|
|GUID1|WFID1|    WF3|01-03-2023|
|GUID1|     |    WF4|01-04-2023|
|     |WFID1|    WF5|01-05-2023|
|GUID1|     |    WF6|01-06-2023|
|GUID2|     |    WF7|01-07-2023|
|     |WFID2|    WF8|01-08-2023|
+-----+-----+-------+----------+

The requirement is to get the GUID and WFID across all candidate events so that the group of events have the same GUID and WFID. The expected output in the above example should be :

+-----+-----+-------+----------+
| GUID| WFID|EventId|      Time|
+-----+-----+-------+----------+
|GUID1|WFID1|    WF1|01-01-2023|
|GUID1|WFID1|    WF2|01-02-2023|
|GUID1|WFID1|    WF3|01-03-2023|
|GUID1|WFID1|    WF4|01-04-2023|
|GUID1|WFID1|    WF5|01-05-2023|
|GUID1|WFID1|    WF6|01-06-2023|
|GUID2|     |    WF7|01-07-2023|
|     |WFID2|    WF8|01-08-2023|
+-----+-----+-------+----------+

Any idea how this can be implemented in Spark without using UDF?


Solution

  • Here is the working solution. Let me know if have a solution without doing any join!

    val dfDistinct = df.filter(col("GUID") =!= "" && col("WFID") =!= "").select(col("GUID").as("GUID1"), col("WFID").as("WFID1")).distinct()
    
    df.join(dfDistinct, df("GUID") === dfDistinct("GUID1") || df("WFID") === dfDistinct("WFID1"), "left")
    .withColumn("GUIDnew", when(col("GUID1").isNotNull, col("GUID1")).otherwise(col("GUID")))
    .withColumn("WFIDnew", when(col("WFID1").isNotNull, col("WFID1")).otherwise(col("WFID")))
    .select(col("GUIDnew").as("GUID"), col("WFIDnew").as("WFID"), col("EventId"), col("Time"))
    .show
    
    +-----+-----+-------+----------+
    | GUID| WFID|EventId|      Time|
    +-----+-----+-------+----------+
    |GUID1|WFID1|    WF1|01-01-2023|
    |GUID1|WFID1|    WF2|01-02-2023|
    |GUID1|WFID1|    WF3|01-03-2023|
    |GUID1|WFID1|    WF4|01-04-2023|
    |GUID1|WFID1|    WF5|01-05-2023|
    |GUID1|WFID1|    WF6|01-06-2023|
    |GUID2|     |    WF7|01-07-2023|
    |     |WFID2|    WF8|01-08-2023|
    +-----+-----+-------+----------+