I am trying to parse, flatten the nested data using pyspark. any suggestion on how to parse such json file
Here is sample code tried so far, but was not successful.
jsonData ="""{
"data": {
"unique_id1": {
"random_code1": {
"name": "some_name",
"status": "value1"
},
"random_code2": {
"name": "some_name",
"status": "value2"
}
},
"unique_id2": {
"random_code3": {
"name": "some_name",
"status": "value2"
},
"random_code4": {
"name": "some_name",
"status": "value2"
}
}
}
}"""
df = spark.read.option("multiLine", "true").json(spark.sparkContext.parallelize([jsonData]))
data_schema = df.schema["data"].dataType.simpleString()
data_schema = re.sub(r"([\w\-]+)(?=:struct<name)", "_RandomCode", data_schema)
data_schema = re.sub(r"([\w\-]+)(?=:struct<_RandomCode)", "_Ids", data_schema)
data_schema = re.sub(r"(?<=,|<)([^,<]+)(?=:)", r"`\1`", data_schema)
Expected output
_Ids _RandomCode name
unique_id1 random_code1 some_name
unique_id1 random_code2 some_name
unique_id2 random_code3 some_name
unique_id2 random_code4 some_name
You can parse the data by using map
and then convert it back to the desired format and extract the relevant columns from the data.
from pyspark.sql.functions import explode_outer
jsonData ="""{
"data": {
"unique_id1": {
"random_code1": {
"name": "some_name",
"status": "value1"
},
"random_code2": {
"name": "some_name",
"status": "value2"
}
},
"unique_id2": {
"random_code3": {
"name": "some_name",
"status": "value2"
},
"random_code4": {
"name": "some_name",
"status": "value2"
}
}
}
}"""
df = spark.read.option("multiLine", "true").json(spark.sparkContext.parallelize([jsonData]))
def parse_data(x):
# Top level keys:
out = []
outer_list = [(key, x[0].asDict()[key].asDict()) for key in x[0].asDict().keys()]
for row in outer_list :
for sub_row in [(key, row[1][key]['name']) for key in row[1].keys()]:
out.append([row[0], sub_row[0], sub_row[1]])
return out
# Convert to RDD and parse the data, convert the RDD
# back to a dataframe and explode.
df = df.rdd.map(lambda x: (1, parse_data(x))).toDF(['dummy', 'parsed_data'])
df = df.withColumn('parsed_data', explode_outer(df['parsed_data']))
# Fetch desired columns from the parsed data.
df = (df.withColumn('_Ids', df['parsed_data'].getItem(0))
.withColumn('_RandomCode', df['parsed_data'].getItem(1))
.withColumn('name', df['parsed_data'].getItem(2))
.drop('dummy', 'parsed_data'))
df.show()
+----------+------------+---------+
| _Ids| _RandomCode| name|
+----------+------------+---------+
|unique_id1|random_code1|some_name|
|unique_id1|random_code2|some_name|
|unique_id2|random_code3|some_name|
|unique_id2|random_code4|some_name|
+----------+------------+---------+