I am trying to read multiple json files from a folder, explode them using dataframe and write it onto a location as delta.
Below is my sample json file:
'{"result": [{"approval": "Approved", "assigned_to": "", "assignment_group": "Business Requests", "opened_at": "08/10/2016 08:36:35", "opened_by": "Kay", "priority": "4 - Low", "sys_created_by": "[email protected]", "sys_created_on": "08/10/2016 08:36:35", "urgency": "3 - Low"}, {"approval": "Approved", "assigned_to": "", "assignment_group": "Business Requests", "opened_at": "08/10/2016 10:13:11", "opened_by": "Alan", "priority": "4 - Low", "sys_created_on": "08/10/2016 10:13:11", "urgency": "3 - Low"}, ................
But Im getting the below error:
AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(Result)
Any suggestion or help to resolve this error ?
I am using the below code:
from pyspark.sql.functions import *
import glob
spark.conf.set("spark.sql.legacy.json.allowEmptyString.enabled", True)
for file in glob.glob(f"/dbfs/mnt/landing/item/2023*/*.json"):
file = file[5:]
df = spark.read.option('multiline',True).json(file)
df_exp = df.select(explode(col("result")).alias('Result')).select('Result.*')
df_explode = df_exp.drop_duplicates()
df_explode.write.format('delta').mode('append').option('inferSchema',True).save('/mnt/landing/finalitem/')
The error message "AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(Result)" means that you are trying to use the explode function on a column that is not structured like a list or an array. Basically, you can only use the explode function on columns that have a specific structure, not just any column.
I have tried the below code:
I have used a JSON file as input for PySpark code to Read, Explode
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode_outer, col
spark = SparkSession.builder.appName("JSONExplode").getOrCreate()
json_file_path = "/FileStore/tables/sample020202.json"
df = spark.read.option('multiline', True).json(json_file_path)
df_exp = df.select(explode_outer(col("result")).alias('Result')).select('Result.*')
df_exp.show()
df_explode = df_exp.dropDuplicates()
delta_destination_path = "/FileStore/tables/finalitem"
df_explode.write.format('delta').mode('append').option('inferSchema', True).save(delta_destination_path)
In the above code I have used explode_outer
function on the "result" column
explode_outer(col("result"))
to explode the elements within the array. The use of explode_outer
suggests that it is designed to handle potential null values within the array.
To check if "result" column is correctly recognized as an array of structs, you can check the schema.