Search code examples
pythonjsonpyspark

Pyspark - How to assign column names to default key 'key' and its values to 'value'


Consider below sample data -

enter image description here

Doing below code -

from pyspark.sql.functions import *

df_result1 = df_data.groupBy(col("Name").alias("Name"), col("Country").alias("Country"), col("Property").alias("Property")).agg(
    (collect_list(struct(col("No"), col("Place"))).alias("details")),
)

df_result2 = df_result1.groupBy(col("Name").alias("Name"), col("Country").alias("Country")).agg(collect_list(struct(col("Property"), col("details"))).alias("Main"))
final = df_result2.toJSON().collect()

Actual Output -

{
  "Name": "David",
  "Country": "Dubai",
  "Main": [
    {
      "Property": "House",
      "details": [
        {
          "No": "1",
          "Place": "JLT"
        }
      ]
    }
  ]
}

Desired Output -

{
  "Name": "David",
  "Country": "Dubai",
  "Main": [
    {
      "Property": "House",
      "details": [
        {
          "key": "No",
          "value": "1"
        },
        {
          "key": "Place",
          "value": "JLT"
        }
      ]
    }
  ]
}

Kindly suggest how to achieve this.


Solution

  • Create an array of structs where each struct in the array contains a key value pair where the key is the column name and the value is the actual value in the column then create a Main column by packing the array along with the Property column. Finally group the dataframe and aggregate the Main column with a list

    from pyspark.sql import functions as F
    
    details = F.array(*[
        F.struct(F.lit(c).alias('key'), F.col(c).alias('value')) 
        for c in ('No', 'Place')
    ])
    
    df_result = df_data.groupBy('Name', "Country").agg(
        F.collect_list(
            F.struct('Property', details.alias('details'))).alias('Main')
    )
    

    df_result.toJSON().collect()
    
    ['{"Name":"David","Country":"Dubai","Main":[{"Property":"House","details":[{"key":"No","value":"1"},{"key":"Place","value":"JLT"}]}]}']