Search code examples
dataframescalajoinapache-spark-sqlunpivot

unpivot columns into multiple columns and values in scala dataframe


I have a data frame as shown below with n number of columns.

 +---+------------+--------+-----+-----+-----+------+------+-------+-----+
 | ID|  DATE      | TYPE   |SIG_A|SIG_B|SIG_C|SIG_AA|SIG_BB|SIG_CCC|SIG_D|
 +---+------------+--------+-----+-----+-----+------+------+-------+-----+
 |01 | 01-01-2021 | TYPE01 | 01  | 02  | 55  | 40   | 63   | 85    | 25  |
 |01 | 02-01-2021 | TYPE01 | 01  | 02  | 55  | 40   | 63   | 85    | 22  |
 |01 | 03-01-2021 | TYPE01 | 01  | 02  | 55  | 40   | 63   | 85    | 52  |
 |02 | 01-01-2021 | TYPE01 | 01  | 02  | 55  | 40   | 63   | 85    | 25  |
 |02 | 02-01-2021 | TYPE01 | 01  | 02  | 55  | 40   | 63   | 85    | 22  |
 |02 | 03-01-2021 | TYPE01 | 01  | 02  | 55  | 40   | 63   | 85    | 52  |
 |03 | 01-01-2021 | TYPE01 | 01  | 02  | 55  | 40   | 63   | 85    | 25  |
 |03 | 02-01-2021 | TYPE01 | 01  | 02  | 55  | 40   | 63   | 85    | 22  |
 |03 | 03-01-2021 | TYPE01 | 01  | 02  | 55  | 40   | 63   | 85    | 52  |
 +---+------------+--------+-----+-----+-----+------+------+-------+-----+

I have to unpivot this dataframe based on column name pattern. (SIG_X, SIG_XX,SIG_XXX and its corresponding values as new unpivoted columns.)

The expected dataframe is as shown below (please don't look at the values as it is not corrected for sample dataframe).

+---+------------+--------+-----+---------+------+----------+-------+-----------+
 | ID|  DATE      | TYPE   |SIG_X|SIG_X VAL|SIG_XX|SIG_XX VAL|SIG_XXX|SIG_XXX VAL|
 +---+------------+--------+-----+---------+------+----------+-------+-----------+
 |01 | 01-01-2021 | TYPE01 |SIG_A| 02      |SIG_AA| 40       |SIG_CCC| 85       |
 |01 | 01-01-2021 | TYPE01 |SIG_B| 02      |GIG_BB| 40       | NULL  | NULL     |
 |01 | 01-01-2021 | TYPE01 |SIG_C| 02      | NULL | NULL     | NULL  | NULL     |
 |01 | 01-01-2021 | TYPE01 |SIG_D| 02      | NULL | NULL     | NULL  | NULL     |
 |01 | 02-01-2021 | TYPE01 |SIG_A| 02      |SIG_AA| 40       |SIG_CCC| 85       |
 |01 | 02-01-2021 | TYPE01 |SIG_B| 02      |GIG_BB| 40       | NULL  | NULL     |
 |01 | 02-01-2021 | TYPE01 |SIG_C| 02      | NULL | NULL     | NULL  | NULL     |
 |01 | 02-01-2021 | TYPE01 |SIG_D| 02      | NULL | NULL     | NULL  | NULL     |
 .................................................................................
 .................................................................................
 |02 | 01-01-2021 | TYPE01 |SIG_A| 02      |SIG_AA| 40       |SIG_CCC| 85       |
 |02 | 01-01-2021 | TYPE01 |SIG_B| 02      |GIG_BB| 40       | NULL  | NULL     |
 |02 | 01-01-2021 | TYPE01 |SIG_C| 02      | NULL | NULL     | NULL  | NULL     |
 |02 | 01-01-2021 | TYPE01 |SIG_D| 02      | NULL | NULL     | NULL  | NULL     |
 .................................................................................
 .................................................................................
 |03 | 03-01-2021 | TYPE01 |SIG_A| 02      |SIG_AA| 40       |SIG_CCC| 85       |
 |03 | 03-01-2021 | TYPE01 |SIG_B| 02      |GIG_BB| 40       | NULL  | NULL     |
 |03 | 03-01-2021 | TYPE01 |SIG_C| 02      | NULL | NULL     | NULL  | NULL     |
 |03 | 03-01-2021 | TYPE01 |SIG_D| 02      | NULL | NULL     | NULL  | NULL     |
 +---+------------+--------+-----+---------+------+---------+--------+-----------+

I have tried creating separate dataframes for each of the scenarios as shown below.

DF01:
+---+------------+--------+-----+---------+
| ID|  DATE      | TYPE   |SIG_X|SIG_A VAL|
+---+------------+--------+-----+---------+
|01 | 01-01-2021 | TYPE01 |SIG_A| 02      |
|01 | 01-01-2021 | TYPE01 |SIG_B| 02      |
|01 | 01-01-2021 | TYPE01 |SIG_C| 02      |
|01 | 01-01-2021 | TYPE01 |SIG_D| 02      |
|01 | 02-01-2021 | TYPE01 |SIG_A| 02      |
|01 | 02-01-2021 | TYPE01 |SIG_B| 02      |
|01 | 02-01-2021 | TYPE01 |SIG_C| 02      |
|01 | 02-01-2021 | TYPE01 |SIG_D| 02      |
...........................................
...........................................
|02 | 01-01-2021 | TYPE01 |SIG_A| 02      |
|02 | 01-01-2021 | TYPE01 |SIG_B| 02      |
|02 | 01-01-2021 | TYPE01 |SIG_C| 02      |
|02 | 01-01-2021 | TYPE01 |SIG_D| 02      |
...........................................
...........................................
|03 | 03-01-2021 | TYPE01 |SIG_A| 02      |
|03 | 03-01-2021 | TYPE01 |SIG_B| 02      |
|03 | 03-01-2021 | TYPE01 |SIG_C| 02      |
|03 | 03-01-2021 | TYPE01 |SIG_D| 02      |
+---+------------+--------+-----+---------+


DF02:
+---+------------+--------+------+----------+
| ID|  DATE      | TYPE   |SIG_XX|SIG_XX VAL|
+---+------------+--------+------+----------+
|01 | 01-01-2021 | TYPE01 |SIG_AA| 40       |
|01 | 01-01-2021 | TYPE01 |GIG_BB| 40       |
|01 | 02-01-2021 | TYPE01 |SIG_AA| 40       |
|01 | 02-01-2021 | TYPE01 |GIG_BB| 40       |
.............................................
.............................................
|02 | 01-01-2021 | TYPE01 |SIG_AA| 40       |
|02 | 01-01-2021 | TYPE01 |GIG_BB| 40       |
.............................................
.............................................
|03 | 03-01-2021 | TYPE01 |SIG_AA| 40       |
|03 | 03-01-2021 | TYPE01 |GIG_BB| 40       |
+---+------------+--------+------+----------+

DF03:
+---+------------+--------+-------+-----------+
 | ID|  DATE      | TYPE   |SIG_XXX|SIG_XXX VAL|
 +---+------------+--------+-------+-----------+
 |01 | 01-01-2021 | TYPE01 |SIG_CCC| 85        |
 |01 | 02-01-2021 | TYPE01 |SIG_CCC| 85        |
  ...............................................
 ...............................................
 |02 | 01-01-2021 | TYPE01 |SIG_CCC| 85        |
 ...............................................
 ...............................................
 |03 | 03-01-2021 | TYPE01 |SIG_CCC| 85        |
 +---+------------+--------+-------+-----------+

And I tried to do join on these 3 dataframes on columns(ID, DATE, TYPE)

  val finalDf = Df01.as("x1")
      .join(Df02.as("x2"), $"x1.id" === $"x2.id" and $"x1.date" === $"x2.date" and $"x1.type" === $"x2.type", "inner")
      .join(Df03.as("x3"), $"x1.id" === $"x3.id" and $"x1.date" === $"x3.date" and $"x1.type" === $"x3.type", "inner")
      .join(Df04.as("x4"), $"x1.id" === $"x4.id" and $"x1.date" === $"x4.date" and $"x1.type" === $"x4.type", "inner")

But the JOIN is running for long day and still running...

Is there any way to resolve this ? Any leads appreciated!


Solution

  • I have found a solution by creating a new primary key column for joining and solution is provided below.

    If anyone else have any other better approach, please share that as well.

    val df = Seq(
        (1,"2022-02-01",0,5,10,15,20,25,30),
        (1,"2022-02-02",0,5,10,15,20,25,30),
        (2,"2022-02-01",0,5,10,15,20,25,30),
        (2,"2022-02-02",0,5,10,15,20,25,30)
      ).toDF("ID","DATE","TYPE","SIG_A","SIG_B","SIG_C","SIG_AA","SIG_BB","SIG_AAA")
    val df01 = df.withColumn("DATE", date_format(col("DATE"),"dd-MM-yyyy"))
    
    val fullSig = List[String]("SIG_A","SIG_B","SIG_C","SIG_AA","SIG_BB","SIG_AAA")
    val fullSigList = fullSig.map(name => struct(lit(name) as "sig_name", col(name) as "sig_value"))
    
    val sig01 = List[String]("SIG_A","SIG_B","SIG_C")
    val sig01List = sig01.map(name => struct(lit(name) as "sig_name", col(name) as "sig_value"))
    
    val sig02 = List[String]("SIG_AA","SIG_BB")
    val sig02List = sig02.map(name => struct(lit(name) as "sig_name", col(name) as "sig_value"))
    
    val sig03 = List[String]("SIG_AAA")
    val sig03List = sig03.map(name => struct(lit(name) as "sig_name", col(name) as "sig_value"))
    
    
    val unpiv01 = df01
          .select($"ID",$"DATE",$"TYPE",explode(array(sig01List:_*))as "feature")
          .select($"ID",$"DATE",$"TYPE",$"feature.sig_name" as "feature1_name", $"feature.sig_value" as "feature1_value")
    val unpiv001 = unpiv01
          .withColumn("row_num", row_number.over(Window.partitionBy("ID","DATE").orderBy("feature1_name")))
          .withColumn("joinkey", concat(col("ID"),lit("-"),col("DATE"),lit("-"),col("row_num")))
          .drop("row_num")
    
    val unpiv02 = df01
          .select($"ID",$"DATE",$"TYPE",explode(array(sig02List:_*))as "feature")
          .select($"ID",$"DATE",$"TYPE",$"feature.sig_name" as "feature2_name", $"feature.sig_value" as "feature2_value")
    val unpiv002 = unpiv02
          .withColumn("row_num", row_number.over(Window.partitionBy("ID","DATE").orderBy("feature2_name")))
          .withColumn("joinkey", concat(col("ID"),lit("-"),col("DATE"),lit("-"),col("row_num")))
          .drop("row_num")
    
    val unpiv03 = df01
          .select($"ID",$"DATE",$"TYPE",explode(array(sig03List:_*))as "feature")
          .select($"ID",$"DATE",$"TYPE",$"feature.sig_name" as "feature3_name", $"feature.sig_value" as "feature3_value")
    val unpiv003= unpiv03
          .withColumn("row_num", row_number.over(Window.partitionBy("ID","DATE").orderBy("feature3_name")))
          .withColumn("joinkey", concat(col("ID"),lit("-"),col("DATE"),lit("-"),col("row_num")))
          .drop("row_num")
    
    val joineddf = unpiv001.as("x1")
        .join(unpiv002.as("x2"),$"x1.joinkey" === $"x2.joinkey", "outer")
        .join(unpiv003.as("x3"),$"x1.joinkey" === $"x3.joinkey", "outer")
        .select($"x1.ID" as "ID",$"x1.DATE" as "DATE",$"x1.TYPE" as "TYPE",$"feature1_name",$"feature1_value",$"feature2_name",$"feature2_value",$"feature3_name",$"feature3_value")
    

    INPUT DATAFRAME:

    +---+----------+----+-----+-----+-----+------+------+-------+
    | ID|      DATE|TYPE|SIG_A|SIG_B|SIG_C|SIG_AA|SIG_BB|SIG_AAA|
    +---+----------+----+-----+-----+-----+------+------+-------+
    |  1|01-02-2022|   0|    5|   10|   15|    20|    25|     30|
    |  1|02-02-2022|   0|    5|   10|   15|    20|    25|     30|
    |  2|01-02-2022|   0|    5|   10|   15|    20|    25|     30|
    |  2|02-02-2022|   0|    5|   10|   15|    20|    25|     30|
    +---+----------+----+-----+-----+-----+------+------+-------+
    
    

    OUTPUT DATAFRAME:

    +---+----------+----+-------------+--------------+-------------+--------------+-------------+--------------+
    | ID|      DATE|TYPE|feature1_name|feature1_value|feature2_name|feature2_value|feature3_name|feature3_value|
    +---+----------+----+-------------+--------------+-------------+--------------+-------------+--------------+
    |  1|01-02-2022|   0|        SIG_A|             5|       SIG_AA|            20|      SIG_AAA|            30|
    |  1|01-02-2022|   0|        SIG_B|            10|       SIG_BB|            25|         null|          null|
    |  1|01-02-2022|   0|        SIG_C|            15|         null|          null|         null|          null|
    |  1|02-02-2022|   0|        SIG_A|             5|       SIG_AA|            20|      SIG_AAA|            30|
    |  1|02-02-2022|   0|        SIG_B|            10|       SIG_BB|            25|         null|          null|
    |  1|02-02-2022|   0|        SIG_C|            15|         null|          null|         null|          null|
    |  2|01-02-2022|   0|        SIG_A|             5|       SIG_AA|            20|      SIG_AAA|            30|
    |  2|01-02-2022|   0|        SIG_B|            10|       SIG_BB|            25|         null|          null|
    |  2|01-02-2022|   0|        SIG_C|            15|         null|          null|         null|          null|
    |  2|02-02-2022|   0|        SIG_A|             5|       SIG_AA|            20|      SIG_AAA|            30|
    |  2|02-02-2022|   0|        SIG_B|            10|       SIG_BB|            25|         null|          null|
    |  2|02-02-2022|   0|        SIG_C|            15|         null|          null|         null|          null|
    +---+----------+----+-------------+--------------+-------------+--------------+-------------+--------------+