I have a dataframe
val df = spark.sqlContext.createDataFrame(Seq( ("100","3","sceince","A"), ("100","3","maths","B"), ("100","3","maths","F"), ("100","3","computesrs","null"), ("101","2","maths","E"), ("101","2","computesrs","C"), ("102","2","maths","null"), ("102","2","computesrs","C"), ("100","2","maths","D"), ("100","2","computesrs","C") )).toDF("Rid","class","subject","teacher")
scala> df.show
+---+-------+----------+-------+
|Rid|class | subject|teacher|
+---+-------+----------+-------+
|100| 3| sceince| A|
|100| 3| maths| B|
|100| 3| maths| F|
|100| 3|computesrs| null|
|101| 2| maths| E|
|101| 2|computesrs| C|
|102| 2| maths| null|
|102| 2|computesrs| C|
|100| 2| maths| D|
|100| 2|computesrs| C|
+---+-------+----------+-------+
I have to pivot this data frame into some(5) fixed columns grouping BY Rid
and class
. Here subject
column may have n no of different values, but per Rid
and class
we have to generate subject
& teacher
columns as key value pair.
Expected dataframe :
+-------+-------+-----------+---------------+---------------+---------------+-----------+---------------+---------------+---------------+--------+--------------+
|Rid |class |period1 |periodteacher1 |period2 |periodteacher2 |period3 |periodteacher3 |period4 |periodteacher4 |period5 |periodteacher5|
+-------+-------+-----------+---------------+---------------+---------------+-----------+---------------+---------------+---------------+--------+--------------+
|100 |3 |sceince |A |maths |B |maths |F |computesrs | | | |
|100 |2 |maths |D |computesrs |C | | | | | | |
|101 |2 |maths |E |computesrs |C | | | | | | |
|102 |2 |maths | |computesrs |C | | | | | | |
+-------+-------+-----------+---------------+---------------+---------------+-----------+---------------+---------------+---------------+--------+--------------+
Any suggestions ?
You need to rank & then apply pivot. Refer below for code
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> val df = Seq((100,3,"sceince","A"), (100,3,"maths","B"), (100,3,"maths","F"), (100,3,"computesrs",null), (101,2,"maths","E"), (101,2,"computesrs","C"), (102,2,"maths",null), (102,2,"computesrs","C"), (100,2,"maths","D"), (100,2,"computesrs","C")).toDF("Rid", "class", "subject", "teacher")
df: org.apache.spark.sql.DataFrame = [Rid: int, class: int ... 2 more fields]
scala> df.show
+---+-----+----------+-------+
|Rid|class| subject|teacher|
+---+-----+----------+-------+
|100| 3| sceince| A|
|100| 3| maths| B|
|100| 3| maths| F|
|100| 3|computesrs| null|
|101| 2| maths| E|
|101| 2|computesrs| C|
|102| 2| maths| null|
|102| 2|computesrs| C|
|100| 2| maths| D|
|100| 2|computesrs| C|
+---+-----+----------+-------+
Creating window function for ranking
scala> val wind2 = Window.partitionBy("Rid", "class").orderBy("subject")
wind2: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@6e13e2fc
scala> val resDF = df.withColumn("rank", row_number().over(wind2))
resDF: org.apache.spark.sql.DataFrame = [Rid: int, class: int ... 3 more fields]
scala> resDF.show
+---+-----+----------+-------+----+
|Rid|class| subject|teacher|rank|
+---+-----+----------+-------+----+
|102| 2|computesrs| C| 1|
|102| 2| maths| null| 2|
|101| 2|computesrs| C| 1|
|101| 2| maths| E| 2|
|100| 2|computesrs| C| 1|
|100| 2| maths| D| 2|
|100| 3|computesrs| null| 1|
|100| 3| maths| B| 2|
|100| 3| maths| F| 3|
|100| 3| sceince| A| 4|
+---+-----+----------+-------+----+
Apply pivot now
scala> resDF.groupBy("Rid", "class").pivot("rank", Seq("1", "2", "3", "4", "5")).agg(first($"subject"), first($"teacher")).show
+---+-----+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|Rid|class|1_first(subject, false)|1_first(teacher, false)|2_first(subject, false)|2_first(teacher, false)|3_first(subject, false)|3_first(teacher, false)|4_first(subject, false)|4_first(teacher, false)|5_first(subject, false)|5_first(teacher, false)|
+---+-----+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|102| 2| maths| null| computesrs| C| null| null| null| null| null| null|
|101| 2| maths| E| computesrs| C| null| null| null| null| null| null|
|100| 2| maths| D| computesrs| C| null| null| null| null| null| null|
|100| 3| sceince| A| maths| B| maths| F| computesrs| null| null| null|
+---+-----+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
Rename the columns using .withColumnRenamed(<existingName>, <newName>)