I am currently building a transformation script in pyspark here is the workflow :
Currently, I am building dictionaries for each row and trying to load them into a row then a dataframe. I am having trouble because every null column does not appear in xml rows. So, some rows will have let's say 20 columns and some 35.
Which leads to
IllegalStateException: Input row doesn't have expected number of values required by the schema. 35 fields are required while 24 values are provided.
Here is how I currently parse my XML to dict to row to Dataframe :
xmlTransformedRoot = xml_transformed.getroot() #XML after xslt being applied
list_of_tables = {child.tag for child in xmlTransformedRoot} #Set to get unique table names
Database = []
for table in list_of_tables:
listData = []
for child in xmlTransformedRoot.findall(table):
data = dict()
for subelem in child:
column = subelem.tag.replace("{urn:schemas-microsoft-com:sql:SqlRowSet}","")
text = subelem.text
data[column] = [text]
listData.append(data)
Database.append((table,spark.createDataFrame(Row(**x) for x in listData)))
Would it be possible to load those rows with a function that allows for schemas updates or should I just create the dataframes structure beforehand with the maximum amount of column that could exist ?
Thanks a lot for your help.
I solved my issue by modifying the unpacking line
Database.append((table,spark.createDataFrame(Row(**x) for x in listData)))
to
Database.append((table,spark.createDataFrame(listData)))
effectively making me insert each dataFrame in one big dictionary.
Apparently psypark can very well handle creating DataFrames from dictionaries even when some fields are missing.