I have a data frame as shown below with n number of columns.
+---+------------+--------+-----+-----+-----+------+------+-------+-----+
| ID| DATE | TYPE |SIG_A|SIG_B|SIG_C|SIG_AA|SIG_BB|SIG_CCC|SIG_D|
+---+------------+--------+-----+-----+-----+------+------+-------+-----+
|01 | 01-01-2021 | TYPE01 | 01 | 02 | 55 | 40 | 63 | 85 | 25 |
|01 | 02-01-2021 | TYPE01 | 01 | 02 | 55 | 40 | 63 | 85 | 22 |
|01 | 03-01-2021 | TYPE01 | 01 | 02 | 55 | 40 | 63 | 85 | 52 |
|02 | 01-01-2021 | TYPE01 | 01 | 02 | 55 | 40 | 63 | 85 | 25 |
|02 | 02-01-2021 | TYPE01 | 01 | 02 | 55 | 40 | 63 | 85 | 22 |
|02 | 03-01-2021 | TYPE01 | 01 | 02 | 55 | 40 | 63 | 85 | 52 |
|03 | 01-01-2021 | TYPE01 | 01 | 02 | 55 | 40 | 63 | 85 | 25 |
|03 | 02-01-2021 | TYPE01 | 01 | 02 | 55 | 40 | 63 | 85 | 22 |
|03 | 03-01-2021 | TYPE01 | 01 | 02 | 55 | 40 | 63 | 85 | 52 |
+---+------------+--------+-----+-----+-----+------+------+-------+-----+
I have to unpivot this dataframe based on column name pattern. (SIG_X, SIG_XX,SIG_XXX and its corresponding values as new unpivoted columns.)
The expected dataframe is as shown below (please don't look at the values as it is not corrected for sample dataframe).
+---+------------+--------+-----+---------+------+----------+-------+-----------+
| ID| DATE | TYPE |SIG_X|SIG_X VAL|SIG_XX|SIG_XX VAL|SIG_XXX|SIG_XXX VAL|
+---+------------+--------+-----+---------+------+----------+-------+-----------+
|01 | 01-01-2021 | TYPE01 |SIG_A| 02 |SIG_AA| 40 |SIG_CCC| 85 |
|01 | 01-01-2021 | TYPE01 |SIG_B| 02 |GIG_BB| 40 | NULL | NULL |
|01 | 01-01-2021 | TYPE01 |SIG_C| 02 | NULL | NULL | NULL | NULL |
|01 | 01-01-2021 | TYPE01 |SIG_D| 02 | NULL | NULL | NULL | NULL |
|01 | 02-01-2021 | TYPE01 |SIG_A| 02 |SIG_AA| 40 |SIG_CCC| 85 |
|01 | 02-01-2021 | TYPE01 |SIG_B| 02 |GIG_BB| 40 | NULL | NULL |
|01 | 02-01-2021 | TYPE01 |SIG_C| 02 | NULL | NULL | NULL | NULL |
|01 | 02-01-2021 | TYPE01 |SIG_D| 02 | NULL | NULL | NULL | NULL |
.................................................................................
.................................................................................
|02 | 01-01-2021 | TYPE01 |SIG_A| 02 |SIG_AA| 40 |SIG_CCC| 85 |
|02 | 01-01-2021 | TYPE01 |SIG_B| 02 |GIG_BB| 40 | NULL | NULL |
|02 | 01-01-2021 | TYPE01 |SIG_C| 02 | NULL | NULL | NULL | NULL |
|02 | 01-01-2021 | TYPE01 |SIG_D| 02 | NULL | NULL | NULL | NULL |
.................................................................................
.................................................................................
|03 | 03-01-2021 | TYPE01 |SIG_A| 02 |SIG_AA| 40 |SIG_CCC| 85 |
|03 | 03-01-2021 | TYPE01 |SIG_B| 02 |GIG_BB| 40 | NULL | NULL |
|03 | 03-01-2021 | TYPE01 |SIG_C| 02 | NULL | NULL | NULL | NULL |
|03 | 03-01-2021 | TYPE01 |SIG_D| 02 | NULL | NULL | NULL | NULL |
+---+------------+--------+-----+---------+------+---------+--------+-----------+
I have tried creating separate dataframes for each of the scenarios as shown below.
DF01:
+---+------------+--------+-----+---------+
| ID| DATE | TYPE |SIG_X|SIG_A VAL|
+---+------------+--------+-----+---------+
|01 | 01-01-2021 | TYPE01 |SIG_A| 02 |
|01 | 01-01-2021 | TYPE01 |SIG_B| 02 |
|01 | 01-01-2021 | TYPE01 |SIG_C| 02 |
|01 | 01-01-2021 | TYPE01 |SIG_D| 02 |
|01 | 02-01-2021 | TYPE01 |SIG_A| 02 |
|01 | 02-01-2021 | TYPE01 |SIG_B| 02 |
|01 | 02-01-2021 | TYPE01 |SIG_C| 02 |
|01 | 02-01-2021 | TYPE01 |SIG_D| 02 |
...........................................
...........................................
|02 | 01-01-2021 | TYPE01 |SIG_A| 02 |
|02 | 01-01-2021 | TYPE01 |SIG_B| 02 |
|02 | 01-01-2021 | TYPE01 |SIG_C| 02 |
|02 | 01-01-2021 | TYPE01 |SIG_D| 02 |
...........................................
...........................................
|03 | 03-01-2021 | TYPE01 |SIG_A| 02 |
|03 | 03-01-2021 | TYPE01 |SIG_B| 02 |
|03 | 03-01-2021 | TYPE01 |SIG_C| 02 |
|03 | 03-01-2021 | TYPE01 |SIG_D| 02 |
+---+------------+--------+-----+---------+
DF02:
+---+------------+--------+------+----------+
| ID| DATE | TYPE |SIG_XX|SIG_XX VAL|
+---+------------+--------+------+----------+
|01 | 01-01-2021 | TYPE01 |SIG_AA| 40 |
|01 | 01-01-2021 | TYPE01 |GIG_BB| 40 |
|01 | 02-01-2021 | TYPE01 |SIG_AA| 40 |
|01 | 02-01-2021 | TYPE01 |GIG_BB| 40 |
.............................................
.............................................
|02 | 01-01-2021 | TYPE01 |SIG_AA| 40 |
|02 | 01-01-2021 | TYPE01 |GIG_BB| 40 |
.............................................
.............................................
|03 | 03-01-2021 | TYPE01 |SIG_AA| 40 |
|03 | 03-01-2021 | TYPE01 |GIG_BB| 40 |
+---+------------+--------+------+----------+
DF03:
+---+------------+--------+-------+-----------+
| ID| DATE | TYPE |SIG_XXX|SIG_XXX VAL|
+---+------------+--------+-------+-----------+
|01 | 01-01-2021 | TYPE01 |SIG_CCC| 85 |
|01 | 02-01-2021 | TYPE01 |SIG_CCC| 85 |
...............................................
...............................................
|02 | 01-01-2021 | TYPE01 |SIG_CCC| 85 |
...............................................
...............................................
|03 | 03-01-2021 | TYPE01 |SIG_CCC| 85 |
+---+------------+--------+-------+-----------+
And I tried to do join on these 3 dataframes on columns(ID, DATE, TYPE)
val finalDf = Df01.as("x1")
.join(Df02.as("x2"), $"x1.id" === $"x2.id" and $"x1.date" === $"x2.date" and $"x1.type" === $"x2.type", "inner")
.join(Df03.as("x3"), $"x1.id" === $"x3.id" and $"x1.date" === $"x3.date" and $"x1.type" === $"x3.type", "inner")
.join(Df04.as("x4"), $"x1.id" === $"x4.id" and $"x1.date" === $"x4.date" and $"x1.type" === $"x4.type", "inner")
But the JOIN is running for long day and still running...
Is there any way to resolve this ? Any leads appreciated!
I have found a solution by creating a new primary key column for joining and solution is provided below.
If anyone else have any other better approach, please share that as well.
val df = Seq(
(1,"2022-02-01",0,5,10,15,20,25,30),
(1,"2022-02-02",0,5,10,15,20,25,30),
(2,"2022-02-01",0,5,10,15,20,25,30),
(2,"2022-02-02",0,5,10,15,20,25,30)
).toDF("ID","DATE","TYPE","SIG_A","SIG_B","SIG_C","SIG_AA","SIG_BB","SIG_AAA")
val df01 = df.withColumn("DATE", date_format(col("DATE"),"dd-MM-yyyy"))
val fullSig = List[String]("SIG_A","SIG_B","SIG_C","SIG_AA","SIG_BB","SIG_AAA")
val fullSigList = fullSig.map(name => struct(lit(name) as "sig_name", col(name) as "sig_value"))
val sig01 = List[String]("SIG_A","SIG_B","SIG_C")
val sig01List = sig01.map(name => struct(lit(name) as "sig_name", col(name) as "sig_value"))
val sig02 = List[String]("SIG_AA","SIG_BB")
val sig02List = sig02.map(name => struct(lit(name) as "sig_name", col(name) as "sig_value"))
val sig03 = List[String]("SIG_AAA")
val sig03List = sig03.map(name => struct(lit(name) as "sig_name", col(name) as "sig_value"))
val unpiv01 = df01
.select($"ID",$"DATE",$"TYPE",explode(array(sig01List:_*))as "feature")
.select($"ID",$"DATE",$"TYPE",$"feature.sig_name" as "feature1_name", $"feature.sig_value" as "feature1_value")
val unpiv001 = unpiv01
.withColumn("row_num", row_number.over(Window.partitionBy("ID","DATE").orderBy("feature1_name")))
.withColumn("joinkey", concat(col("ID"),lit("-"),col("DATE"),lit("-"),col("row_num")))
.drop("row_num")
val unpiv02 = df01
.select($"ID",$"DATE",$"TYPE",explode(array(sig02List:_*))as "feature")
.select($"ID",$"DATE",$"TYPE",$"feature.sig_name" as "feature2_name", $"feature.sig_value" as "feature2_value")
val unpiv002 = unpiv02
.withColumn("row_num", row_number.over(Window.partitionBy("ID","DATE").orderBy("feature2_name")))
.withColumn("joinkey", concat(col("ID"),lit("-"),col("DATE"),lit("-"),col("row_num")))
.drop("row_num")
val unpiv03 = df01
.select($"ID",$"DATE",$"TYPE",explode(array(sig03List:_*))as "feature")
.select($"ID",$"DATE",$"TYPE",$"feature.sig_name" as "feature3_name", $"feature.sig_value" as "feature3_value")
val unpiv003= unpiv03
.withColumn("row_num", row_number.over(Window.partitionBy("ID","DATE").orderBy("feature3_name")))
.withColumn("joinkey", concat(col("ID"),lit("-"),col("DATE"),lit("-"),col("row_num")))
.drop("row_num")
val joineddf = unpiv001.as("x1")
.join(unpiv002.as("x2"),$"x1.joinkey" === $"x2.joinkey", "outer")
.join(unpiv003.as("x3"),$"x1.joinkey" === $"x3.joinkey", "outer")
.select($"x1.ID" as "ID",$"x1.DATE" as "DATE",$"x1.TYPE" as "TYPE",$"feature1_name",$"feature1_value",$"feature2_name",$"feature2_value",$"feature3_name",$"feature3_value")
INPUT DATAFRAME:
+---+----------+----+-----+-----+-----+------+------+-------+
| ID| DATE|TYPE|SIG_A|SIG_B|SIG_C|SIG_AA|SIG_BB|SIG_AAA|
+---+----------+----+-----+-----+-----+------+------+-------+
| 1|01-02-2022| 0| 5| 10| 15| 20| 25| 30|
| 1|02-02-2022| 0| 5| 10| 15| 20| 25| 30|
| 2|01-02-2022| 0| 5| 10| 15| 20| 25| 30|
| 2|02-02-2022| 0| 5| 10| 15| 20| 25| 30|
+---+----------+----+-----+-----+-----+------+------+-------+
OUTPUT DATAFRAME:
+---+----------+----+-------------+--------------+-------------+--------------+-------------+--------------+
| ID| DATE|TYPE|feature1_name|feature1_value|feature2_name|feature2_value|feature3_name|feature3_value|
+---+----------+----+-------------+--------------+-------------+--------------+-------------+--------------+
| 1|01-02-2022| 0| SIG_A| 5| SIG_AA| 20| SIG_AAA| 30|
| 1|01-02-2022| 0| SIG_B| 10| SIG_BB| 25| null| null|
| 1|01-02-2022| 0| SIG_C| 15| null| null| null| null|
| 1|02-02-2022| 0| SIG_A| 5| SIG_AA| 20| SIG_AAA| 30|
| 1|02-02-2022| 0| SIG_B| 10| SIG_BB| 25| null| null|
| 1|02-02-2022| 0| SIG_C| 15| null| null| null| null|
| 2|01-02-2022| 0| SIG_A| 5| SIG_AA| 20| SIG_AAA| 30|
| 2|01-02-2022| 0| SIG_B| 10| SIG_BB| 25| null| null|
| 2|01-02-2022| 0| SIG_C| 15| null| null| null| null|
| 2|02-02-2022| 0| SIG_A| 5| SIG_AA| 20| SIG_AAA| 30|
| 2|02-02-2022| 0| SIG_B| 10| SIG_BB| 25| null| null|
| 2|02-02-2022| 0| SIG_C| 15| null| null| null| null|
+---+----------+----+-------------+--------------+-------------+--------------+-------------+--------------+