Search code examples
jsonpysparkapache-spark-sql

Parse Nested JSON payload using pyspark


I am trying to parse, flatten the nested data using pyspark. any suggestion on how to parse such json file

Here is sample code tried so far, but was not successful.

jsonData ="""{
    "data": {
        "unique_id1": {
            "random_code1": {
                "name": "some_name",
                "status": "value1"
            },
            "random_code2": {
                "name": "some_name",
                "status": "value2"
            }
        },
        "unique_id2": {
            "random_code3": {
                "name": "some_name",
                "status": "value2"
            },
            "random_code4": {
                "name": "some_name",
                "status": "value2"
            }
        }
    }

}"""
df = spark.read.option("multiLine", "true").json(spark.sparkContext.parallelize([jsonData]))

data_schema = df.schema["data"].dataType.simpleString()
data_schema  = re.sub(r"([\w\-]+)(?=:struct<name)", "_RandomCode", data_schema)
data_schema  = re.sub(r"([\w\-]+)(?=:struct<_RandomCode)", "_Ids", data_schema)
data_schema = re.sub(r"(?<=,|<)([^,<]+)(?=:)", r"`\1`", data_schema)

Expected output

_Ids              _RandomCode              name
unique_id1         random_code1            some_name
unique_id1         random_code2            some_name

unique_id2         random_code3            some_name
unique_id2         random_code4            some_name

Solution

  • You can parse the data by using map and then convert it back to the desired format and extract the relevant columns from the data.

    from pyspark.sql.functions import explode_outer
    
    jsonData ="""{
        "data": {
            "unique_id1": {
                "random_code1": {
                    "name": "some_name",
                    "status": "value1"
                },
                "random_code2": {
                    "name": "some_name",
                    "status": "value2"
                }
            },
            "unique_id2": {
                "random_code3": {
                    "name": "some_name",
                    "status": "value2"
                },
                "random_code4": {
                    "name": "some_name",
                    "status": "value2"
                }
            }
        }
    
    }"""
    df = spark.read.option("multiLine", "true").json(spark.sparkContext.parallelize([jsonData]))
    
    def parse_data(x):
    
        # Top level keys:
        out = []
        outer_list = [(key, x[0].asDict()[key].asDict()) for key in x[0].asDict().keys()]
    
        for row in outer_list :
            for sub_row in [(key, row[1][key]['name']) for key in row[1].keys()]:
                out.append([row[0], sub_row[0], sub_row[1]])
        
        return out
    
    # Convert to RDD and parse the data, convert the RDD
    # back to a dataframe and explode.
    df = df.rdd.map(lambda x: (1, parse_data(x))).toDF(['dummy', 'parsed_data'])
    df = df.withColumn('parsed_data', explode_outer(df['parsed_data']))
    
    # Fetch desired columns from the parsed data.
    df = (df.withColumn('_Ids', df['parsed_data'].getItem(0))
            .withColumn('_RandomCode', df['parsed_data'].getItem(1))
            .withColumn('name', df['parsed_data'].getItem(2))
            .drop('dummy', 'parsed_data'))
    
    df.show()
    
    +----------+------------+---------+
    |      _Ids| _RandomCode|     name|
    +----------+------------+---------+
    |unique_id1|random_code1|some_name|
    |unique_id1|random_code2|some_name|
    |unique_id2|random_code3|some_name|
    |unique_id2|random_code4|some_name|
    +----------+------------+---------+