Search code examples
pythonjsondataframeapache-sparkpyspark

How to Flatten JSON file using pyspark


I need to flatten JSON file so that I can get output in table format.Ihavetried but not getting the output that I want

This is my JSON file :-

{
    "records": [
        {
            "name": "A",
            "last_name": "B",
            "special_values": [
                {
                    "name": "address",
                    "value": "some adress"
                },
                {
                    "name": "city",
                    "value": "Chd"
                },
                {
                    "name": "zip_code",
                    "value": "160036"
                }
            ]
        },
        {
            "name": "X",
            "last_name": "Y",
            "special_values": [
                {
                    "name": "adress",
                    "value": "some adress"
                },
                {
                    "name": "city",
                    "value": "Dallas"
                },
                {
                    "name": "zip_code",
                    "value": "02431"
                }
            ]
        }
    ]
}

I want this output:-

|name|last_name|address|city  |zip_code|FIELD7|
|----|---------|-------|------|--------|------|
|A   |B        |some   |adress|chd     |160036|
|X   |Y        |some   |adress|Dallas  |02431 |

I have tried this code but getting different output :-

df = spark.read.option("multiline","true").json(r"C:\Users\Lajo\Downloads\spark_ex2_input.json")
from pyspark.sql.types import *
from pyspark.sql.functions import explode_outer,col


def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df

df_flatten = flatten(df)
df_flatten.show()

Solution

  • One option is to flatten the data before making it into a data frame. Consider reading the JSON file with the built-in json library. Then you can perform the following operation on the resulting data object.

    data = data["records"] # It seems that the data you want is in "records"
    for entry in data:
        for special_value in entry["special_values"]: # Add each special value to the entry
            entry[special_value["name"]] = special_value["value"]
        del entry["special_values"] # Remove the "special_values" list
    

    See below for an example using list and dict comprehension (about 10% slower than the above). The x | y syntax is for merging the dicts created with dict comprehension. You can use it in Python versions > 3.9.0.

    data = [
        (
            {k: v for k, v in entry.items() if k != "special_values"} |
            {
                value["name"]: value["value"]
                for value in entry["special_values"]
            }
        )
        for entry in data["records"]
    ]
    

    Here it is on one line:

    data = [({k: v for k, v in entry.items() if k != "special_values"} | {value["name"]: value["value"] for value in entry["special_values"]}) for entry in data["records"]]
    

    Here is the before and after.

    {
        "records": [
            {
                "name": "A",
                "last_name": "B",
                "special_values": [
                    {
                        "name": "address",
                        "value": "some adress"
                    },
                    {
                        "name": "city",
                        "value": "Chd"
                    },
                    {
                        "name": "zip_code",
                        "value": "160036"
                    }
                ]
            },
            {
                "name": "X",
                "last_name": "Y",
                "special_values": [
                    {
                        "name": "adress",
                        "value": "some adress"
                    },
                    {
                        "name": "city",
                        "value": "Dallas"
                    },
                    {
                        "name": "zip_code",
                        "value": "02431"
                    }
                ]
            }
        ]
    }
    
    [{'address': 'some adress',
      'city': 'Chd',
      'last_name': 'B',
      'name': 'A',
      'zip_code': '160036'},
     {'adress': 'some adress',
      'city': 'Dallas',
      'last_name': 'Y',
      'name': 'X',
      'zip_code': '02431'}]
    

    You can, then, create a data frame from the data object.

    df = spark.createDataFrame(data)
    df.show()
    
    +-----------+------+---------+----+--------+-----------+
    |    address|  city|last_name|name|zip_code|     adress|
    +-----------+------+---------+----+--------+-----------+
    |some adress|   Chd|        B|   A|  160036|       NULL|
    |       NULL|Dallas|        Y|   X|   02431|some adress|
    +-----------+------+---------+----+--------+-----------+