Search code examples
pythonazurepysparkdatabricks

Explode multiple json files using python


I am trying to read multiple json files from a folder, explode them using dataframe and write it onto a location as delta.

Below is my sample json file:

'{"result": [{"approval": "Approved", "assigned_to": "", "assignment_group": "Business Requests", "opened_at": "08/10/2016 08:36:35", "opened_by": "Kay", "priority": "4 - Low", "sys_created_by": "[email protected]", "sys_created_on": "08/10/2016 08:36:35", "urgency": "3 - Low"}, {"approval": "Approved", "assigned_to": "", "assignment_group": "Business Requests", "opened_at": "08/10/2016 10:13:11", "opened_by": "Alan", "priority": "4 - Low", "sys_created_on": "08/10/2016 10:13:11", "urgency": "3 - Low"}, ................

But Im getting the below error: AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(Result)

Any suggestion or help to resolve this error ?

I am using the below code:

from pyspark.sql.functions import *
import glob
spark.conf.set("spark.sql.legacy.json.allowEmptyString.enabled", True)
for file in glob.glob(f"/dbfs/mnt/landing/item/2023*/*.json"):
    file = file[5:]
    df = spark.read.option('multiline',True).json(file)
    df_exp = df.select(explode(col("result")).alias('Result')).select('Result.*')
    df_explode = df_exp.drop_duplicates()
    df_explode.write.format('delta').mode('append').option('inferSchema',True).save('/mnt/landing/finalitem/')

Solution

  • The error message "AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(Result)" means that you are trying to use the explode function on a column that is not structured like a list or an array. Basically, you can only use the explode function on columns that have a specific structure, not just any column.

    I have tried the below code:

    I have used a JSON file as input for PySpark code to Read, Explode

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import explode_outer, col
    spark = SparkSession.builder.appName("JSONExplode").getOrCreate()
    json_file_path = "/FileStore/tables/sample020202.json"
    df = spark.read.option('multiline', True).json(json_file_path)
    df_exp = df.select(explode_outer(col("result")).alias('Result')).select('Result.*')
    df_exp.show()
    df_explode = df_exp.dropDuplicates()
    delta_destination_path = "/FileStore/tables/finalitem"
    df_explode.write.format('delta').mode('append').option('inferSchema', True).save(delta_destination_path)
    

    enter image description here

    In the above code I have used explode_outer function on the "result" column

    explode_outer(col("result")) to explode the elements within the array. The use of explode_outer suggests that it is designed to handle potential null values within the array.

    To check if "result" column is correctly recognized as an array of structs, you can check the schema. enter image description here