Search code examples
pythondataframeapache-sparkpyspark

I need to aggregate and transpose one column to rows in Pyspark (long to wide format)


I need to "explode" one column to multiple columns - similar to pivot but I need new column names.

Give this spark dataframe:

df = spark.createDataFrame(
[(1, 6, 200),
  (1, 6, 300),
  (1, 6, 400),
  (1, 7, 1000),
  (1, 6, 5000)],  
"id int, term int, amount int")

df.show()

+---+----+------+
| id|term|amount|
+---+----+------+
|  1|   6|   200|
|  1|   6|   300|
|  1|   6|   400|
|  1|   7|  1000|
|  1|   7|  5000|
+---+----+------+

How can break "amount" into new columns? Some cells will be empty, which is ok.

enter image description here


Solution

  • Group by id / term columns with collecting amount values into list, select resulting columns basing on the max size of amt (derived from amount) array column:

    import pyspark.sql.functions as F
    
    new_df = (df.groupBy(['id', 'term'])
              .agg(F.collect_list(F.col("amount")).alias("amt")))
    # get the max size of an array in `amt` column
    max_arr_size = new_df.select(F.max(F.size('amt'))).first()[0]
    
    new_df.select('id', 'term', *[F.col('amt')[i].alias(f'amt_{i+1}')
                                  for i in range(max_arr_size)]).show()
    

    +---+----+-----+-----+-----+
    | id|term|amt_1|amt_2|amt_3|
    +---+----+-----+-----+-----+
    |  1|   6|  200|  300|  400|
    |  1|   7| 1000| 5000| null|
    +---+----+-----+-----+-----+