Search code examples
jsonpysparkazure-synapseflattenspark-notebook

How do I flatten this complex json using PySpark?


Background: I am using Azure Synapase to pipe data from an API to a json file. I am than using a PySpark Notebook to flatten that complex json so that I can load data into a SQL Database.

Comments I have spent hours trying to modify this PySpark Code to flatten this appropriately and have had zero luck, any help would be appreciated.

Structure of Complex JSON

Structure of JSON File

My PySpark Code

    from pyspark.sql.functions import explode
from pyspark.sql.functions import col

dfGetReportData = spark.read.option("multiline", "true").option("mode", "PERMISSIVE").json('xxxxxxxxxxxxx')
#dfGetReportData.printSchema()
#dfGetReportData.show()

dfReportData = dfGetReportData.select("response.requestId", "response.code",explode("response.result.reportData").alias("reportData"))
#dfData.printSchema()
#dfData.show()

dfRentRoll = dfReportData.select(explode("reportData.current_residents").alias("current_residents"))
#dfRentRoll.printSchema()

dfCurrent_Residents = dfRentRoll.select("current_residents.property_name",
       "current_residents.property",
        "current_residents.lookup_code",
        "current_residents.bldg_unit",
        "current_residents.bldg",
        "current_residents.unit",
        "current_residents.floorplan_name",
        "current_residents.unit_type",
        "current_residents.space_option",
        "current_residents.sqft",
        "current_residents.unit_status",
        "current_residents.unit_occupancy_type",
        col("current_residents.unit_address").getItem(0).alias("unit_address"),
        "current_residents.bed", 
        "current_residents.bath",
        "current_residents.resident_name",
        "current_residents.phone_number",
        "current_residents.email",
        "current_residents.occupants",
        "current_residents.original_lease_start",
        "current_residents.lease_id",
        "current_residents.lease_status",
        "current_residents.lease_occupancy_type",
        "current_residents.move_in_date",
        "current_residents.lease_start_date",
        "current_residents.lease_end_date",
        "current_residents.previous_lease_end_date",
        "current_residents.move_out_date",
        "current_residents.lease_term_name",
        "current_residents.lease_term",
        "current_residents.contract_length_months",
        "current_residents.occupied_length_months",
        "current_residents.market_rent"
                )
#dfCurrent_Residents.show()
#dfCurrent_Residents.printSchema()
dfCurrent_Residents.write.mode('overwrite').parquet("abfss://[email protected]/Silver/entities/Reporting/RentRoll")

Error Message

AnalysisException                         Traceback (most recent call last)
Cell In [17], line 12
      8 dfReportData = dfGetReportData.select("response.requestId", "response.code",explode("response.result.reportData").alias("reportData"))
      9 #dfData.printSchema()
     10 #dfData.show()
---> 12 dfRentRoll = dfReportData.select(explode("reportData.current_residents").alias("current_residents"))
     13 #dfRentRoll.printSchema()
     15 dfCurrent_Residents = dfRentRoll.select("current_residents.property_name",
     16        "current_residents.property",
     17         "current_residents.lookup_code",
   (...)
     91         "current_residents.military_rank_band"
     92                 )

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:2023, in DataFrame.select(self, *cols)
   2002 def select(self, *cols: "ColumnOrName") -> "DataFrame":  # type: ignore[misc]
   2003     """Projects a set of expressions and returns a new :class:`DataFrame`.
   2004 
   2005     .. versionadded:: 1.3.0
   (...)
   2021     [Row(name='Alice', age=12), Row(name='Bob', age=15)]
   2022     """
-> 2023     jdf = self._jdf.select(self._jcols(*cols))
   2024     return DataFrame(jdf, self.sparkSession)

File ~/cluster-env/env/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
    192 converted = convert_exception(e.java_exception)
    193 if not isinstance(converted, UnknownException):
    194     # Hide where the exception came from that shows a non-Pythonic
    195     # JVM exception message.
--> 196     raise converted from None
    197 else:
    198     raise

AnalysisException: Can't extract value from reportData#191: need struct type but got string

Solution

  • The reason your getting this error is because some of the records inside reportData consists strings. So it takes all of the record type as string type, if you try to extract it you will get error.

    enter image description here

    Due to second record which is having a string Done all other are stored with type string.

    So, to avoid this you can load this data using function from_json providing a standard schema for the column reportData schema.

    Below is the standard schema created on reportData

    from pyspark.sql.functions import explode,flatten,col,from_json
    from pyspark.sql.types import StructType,StructField,ArrayType,StringType,IntegerType
    
    schema = json_schema = spark.read.json(dfReportData.select("reportData").rdd.map(lambda row: row.reportData)).schema
    
    dfReportData = dfReportData.withColumn("json_ex",from_json(col('reportData'),schema)).drop('reportData')
    dfRentRoll = dfReportData.select(explode("json_ex.current_residents").alias("current_residents"))
    display(dfRentRoll)
    

    enter image description here

    Now you can use your further code for dfCurrent_Residents and write to parquet.

    Note: Some of the records in reportData becomes null if they doesn't match the standard schema, check them properly and use.