I have parquet comming from sql server that has this schema:
root
|-- user_uid: string (nullable = true)
|-- user_email: string (nullable = true)
|-- ud_id: integer (nullable = true)
|-- ud_standard_workflow_id: integer (nullable = true)
|-- ud_is_preview: boolean (nullable = true)
|-- ud_is_completed: boolean (nullable = true)
|-- ud_language: string (nullable = true)
|-- ud_created_date: timestamp (nullable = true)
|-- ud_modified_date: string (nullable = true)
|-- ud_created_by_id: string (nullable = true)
|-- dsud_id: integer (nullable = true)
|-- dsud_user_data_id: integer (nullable = true)
|-- dsud_dynamic_step_id: integer (nullable = true)
|-- dsud_is_completed: boolean (nullable = true)
|-- dsud_answers: string (nullable = true)
Last column dsud_answers is a string, but contains JSON data in form of list:
[{"QuestionId":6406,"QuestionTitle":"Residency","Value":"1975"},{"QuestionId":6407,"QuestionTitle":"Citizentship","Value":"66664"}]
How can I transform this column into proper JSON datatype? Im still getting error: data type mismatch: Input schema "STRING" must be a struct, an array or a map.
My desire result is to have column dsud_answers as JSON datatype so I can flatten content of if. In this case there will be 2 records, because JSON contains 2 QuestionIds.
I managed to transform it in Pandas but cannot figure our Pyspark way to do it. I tranformed Pyspark dataframe to Pandas dataframe and then looped through all columns.
from pyspark.sql.types import StructField, IntegerType, TimestampType, BooleanType
def batch_function (df_answers, batch_id):
df = df_answers.select("*").filter(df_bgx_answers.dsud_answers != '[]')
.withColumn("ud_modified_date", to_timestamp(df_bgx_answers.ud_modified_date))
.drop_duplicates()
.toPandas()
df_attributes = pd.DataFrame()
df_final = pd.DataFrame()
# Loop through the data to fill the dataframe
for index in df.index:
indexId = df.dsud_id[index]
userDataId = df.dsud_user_data_id[index]
dynamicStepId = df.dsud_dynamic_step_id[index]
languageID = df.ud_language[index]
createdDate = to_datetime(df.ud_created_date[index])
createdBy = df.ud_created_by_id[index]
modifiedDate = df.ud_modified_date[index]
email = df.user_email[index]
workflowId = df.ud_standard_workflow_id[index]
uid = df.user_uid[index]
completed = df.wrn_is_completed[index]
agreed = df.wrn_is_agreed[index]
flow_name = df.wf_name[index]
row_json = json.loads(df.dsud_answers[index])
normalized_row = pd.json_normalize(row_json)
df_attributes = pd.concat([df_attributes, normalized_row], ignore_index=True)
df_attributes['dsud_user_data_id'] = userDataId
df_attributes['dsud_id'] = indexId
df_attributes['dsud_dynamic_step_id'] = dynamicStepId
df_attributes['ud_language'] = languageID
df_attributes['ud_created_date'] = createdDate
df_attributes['ud_created_by_id'] = createdBy
df_attributes['ud_modified_date'] = modifiedDate
df_attributes['user_email'] = email
df_attributes['ud_standard_workflow_id'] = workflowId
df_attributes['user_uid'] = uid
df_attributes['wrn_is_completed'] = completed
df_attributes['wrn_is_agreed'] = agreed
df_attributes['wf_name'] = flow_name
df_attributes = df_attributes.reset_index(drop=True)
df_final = pd.concat([df_final, df_attributes])
df_answers = spark.createDataFrame(df_final)
df_answers.write.mode("append").format("delta").saveAsTable("final_table")
To transform the dsud_answers column from a string to a JSON data type in PySpark
df.printSchema()
root
|-- dsud_answers: string (nullable = true)
|-- dsud_dynamic_step_id: long (nullable = true)
|-- dsud_id: long (nullable = true)
|-- dsud_is_completed: boolean (nullable = true)
|-- dsud_user_data_id: long (nullable = true)
|-- ud_created_by_id: string (nullable = true)
|-- ud_created_date: string (nullable = true)
|-- ud_id: long (nullable = true)
|-- ud_is_completed: boolean (nullable = true)
|-- ud_is_preview: boolean (nullable = true)
|-- ud_language: string (nullable = true)
|-- ud_modified_date: string (nullable = true)
|-- ud_standard_workflow_id: long (nullable = true)
|-- user_email: string (nullable = true)
|-- user_uid: string (nullable = true)
I have tried the below appoach:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType
schema = ArrayType(StructType([
StructField("QuestionId", IntegerType()),
StructField("QuestionTitle", StringType()),
StructField("Value", StringType())
]))
df = df.withColumn("dsud_answers", from_json(col("dsud_answers"), schema))
df.display(truncate=False)
Results:
dsud_answers dsud_dynamic_step_id dsud_id dsud_is_completed dsud_user_data_id ud_created_by_id ud_created_date ud_id ud_is_completed ud_is_preview ud_language ud_modified_date ud_standard_workflow_id user_email user_uid
[{"QuestionId":6406,"QuestionTitle":"Residency","Value":"1975"},{"QuestionId":6407,"QuestionTitle":"Citizenship","Value":"66664"}] 201 100 false 101 user1 2022-01-01 10:00:00 1 false true en 2022-01-02 09:00:00 1001 [email protected] 12345
[{"QuestionId":6408,"QuestionTitle":"Education","Value":"Bachelor"},{"QuestionId":6409,"QuestionTitle":"Occupation","Value":"Engineer"}] 202 200 true 201 user2 2022-01-03 14:00:00 2 true false es 2022-01-04 10:00:00 1002 [email protected] 67890
In the above code, I am applying the schema and converting the dsud_answers column to JSON
datatype. I am defining a schema for the JSON data. The JSON is expected to represent an array of objects, each containing three fields: QuestionId (Integer), QuestionTitle (String), and Value (String). Using the withColumn
method, I transform the DataFrame (df) by parsing the JSON column named "dsud_answers" into a structured format defined by the schema. The result is a new column named "dsud_answers" containing the parsed JSON data.