I have the following pyspark dataframe.
+------------------------------------+-------+-----------+-----------+---+
|id |orderid|item |cpid |pos|
+------------------------------------+-------+-----------+-----------+---+
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |708542578 |"814735847"|0 |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |708542578 |"596343371"|1 |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |708542578 |"814735847"| |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |708542578 |"222288904"|3 |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |708542578 |"183692578"|2 |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |708542578 |"222288904"| |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |708542578 |"303519145"|4 |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |152507105 |"390801409"|0 |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |152507105 |"129141834"|3 |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |152507105 |"900045087"|1 |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |152507105 |"280043267"|2 |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |152507105 |"390801409"| |
|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |152507105 |"335625185"|4 |
+------------------------------------+-------+-----------+-----------+---+
And I would like to transform the rows into single row multiple columns
| id | orderid | item | pos_0_cpid | pos_1_cpid | pos_2_cpid | pos_3_cpid | pos_4_cpid |
|---------------------------------------------|----------------|------------------|-------------------|-------------------|-------------------|-------------------|-------------------|
| 7d6d7b44-440b-45d7-b25a-3d55817a889b | # | 708542578 | 814735847 | 596343371 | 183692578 | 222288904 | 303519145 |
| 7d6d7b44-440b-45d7-b25a-3d55817a889b | # | 152507105 | 390801409 | 900045087 | 280043267 | 129141834 | 335625185 |
How can I achieve this?
Try by creating temporary column(ex: expr('concat("pos_",pos,"_cpid")'))
for the desired column names in the output and then groupBy + Pivot + agg
.
Then drop the null (or) ''
column from the final dataframe.
Example:
#sample Dataframe
df.show()
#+--------------------+-------+---------+---------+----+
#| id|orderid| item| cpid| pos|
#+--------------------+-------+---------+---------+----+
#|7d6d7b44-440b-45d...| #|708542578|814735847| 0|
#|7d6d7b44-440b-45d...| #|708542578|596343371| 1|
#|7d6d7b44-440b-45d...| #|708542578|814735847|null|
#|7d6d7b44-440b-45d...| #|708542578|222288904| 3|
#|7d6d7b44-440b-45d...| #|708542578|183692578| 2|
#|7d6d7b44-440b-45d...| #|708542578|222288904|null|
#|7d6d7b44-440b-45d...| #|708542578|303519145| 4|
#|7d6d7b44-440b-45d...| #|152507105|390801409| 0|
#|7d6d7b44-440b-45d...| #|152507105|129141834| 3|
#|7d6d7b44-440b-45d...| #|152507105|900045087| 1|
#|7d6d7b44-440b-45d...| #|152507105|280043267| 2|
#|7d6d7b44-440b-45d...| #|152507105|390801409|null|
#|7d6d7b44-440b-45d...| #|152507105|335625185| 4|
#+--------------------+-------+---------+---------+----+
from pyspark.sql.functions import *
df.withColumn("pos", expr('concat("pos_",pos,"_cpid")')).\
groupBy("id","orderid","item").pivot("pos").agg(first(col("cpid")).alias("pos_cpid")).drop('null').\
show(10,False)
#+------------------------------------+-------+---------+----------+----------+----------+----------+----------+
#|id |orderid|item |pos_0_cpid|pos_1_cpid|pos_2_cpid|pos_3_cpid|pos_4_cpid|
#+------------------------------------+-------+---------+----------+----------+----------+----------+----------+
#|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |152507105|390801409 |900045087 |280043267 |129141834 |335625185 |
#|7d6d7b44-440b-45d7-b25a-3d55817a889b|# |708542578|814735847 |596343371 |183692578 |222288904 |303519145 |
#+------------------------------------+-------+---------+----------+----------+----------+----------+----------+