Search code examples
pysparkaws-glueaws-glue-spark

Pyspark turn a list to a dictionary in a specific column


I have a spark dataframe that looks like this in json;

{
  "site_id": "ABC",
  "region": "Texas",
  "areas": [
    {
      "Carbon": [
        "ABB",
        "ABD",
        "ABE"
      ]
    }
  ],
  "site_name": "ABC"
}

and I need to turn "areas" column to this;

"areas": 
  [
    {
        "area_name": "Carbon",
        "pls": [
         {
             "pl_name": "ABB"
         },
         {
             "pl_name": "ABD"
         },  
         {
             "pl_name": "ABE"
         }
       ]
    }

]

I already did df.collect() and manipulated the dictionary directly, but that created some complexity. Is there a way to do this directly in the dataframe itself?

Edit: Here is the input schema

|-- site_id: string
|-- region: string
|-- site_name: string
|-- areas: array
|    |-- element: map
|    |    |-- keyType: string
|    |    |-- valueType: array
|    |    |    |-- element: string

in the output schema, the goal is to have the valueType also a dictionary. I actually save the data to a dynamodb table so the output should like the example I provided when scanned from the table.


Solution

  • Processing and producing JSON is not Spark's strength as far as my understanding. The easiest approach (that is not flattening then grouping by then collecting then pivoting etc) is using UDF. I completely understand UDF is not as fast as built-in Spark transformation, but if your data scale is not that big then it shouldn't be a problem.

    def transform_json(arr):
        r = []
        for e in arr:
            for k in e.keys():
                r.append({
                    'area_name': k,
                    'pls': [{'pl_name': i} for i in e[k]]
                })
        return r
    
    (df
        .withColumn('areas', F.udf(
            transform_json,
            T.ArrayType(T.StructType([
                T.StructField('area_name', T.StringType()),
                T.StructField('pls', T.ArrayType(T.StructType([
                    T.StructField('pl_name', T.StringType())
                ]))),
            ])
            ))('areas')
        )
        .show(10, False)
    )
    
    # Output
    # +------------------------------------------------------------------+
    # |areas                                                             |
    # +------------------------------------------------------------------+
    # |[{Carbon, [{ABB}, {ABD}, {ABE}]}, {Oxygen, [{ABB}, {ABD}, {ABE}]}]|
    # +------------------------------------------------------------------+
    
    # Schema
    # root
    #  |-- areas: array (nullable = true)
    #  |    |-- element: struct (containsNull = true)
    #  |    |    |-- area_name: string (nullable = true)
    #  |    |    |-- pls: array (nullable = true)
    #  |    |    |    |-- element: struct (containsNull = true)
    #  |    |    |    |    |-- pl_name: string (nullable = true)