Search code examples
jsonpysparkazure-databricks

Convert parquet column to Json


I have parquet comming from sql server that has this schema:

root
 |-- user_uid: string (nullable = true)
 |-- user_email: string (nullable = true)
 |-- ud_id: integer (nullable = true)
 |-- ud_standard_workflow_id: integer (nullable = true)
 |-- ud_is_preview: boolean (nullable = true)
 |-- ud_is_completed: boolean (nullable = true)
 |-- ud_language: string (nullable = true)
 |-- ud_created_date: timestamp (nullable = true)
 |-- ud_modified_date: string (nullable = true)
 |-- ud_created_by_id: string (nullable = true)
 |-- dsud_id: integer (nullable = true)
 |-- dsud_user_data_id: integer (nullable = true)
 |-- dsud_dynamic_step_id: integer (nullable = true)
 |-- dsud_is_completed: boolean (nullable = true)
 |-- dsud_answers: string (nullable = true)

Last column dsud_answers is a string, but contains JSON data in form of list:

[{"QuestionId":6406,"QuestionTitle":"Residency","Value":"1975"},{"QuestionId":6407,"QuestionTitle":"Citizentship","Value":"66664"}]

How can I transform this column into proper JSON datatype? Im still getting error: data type mismatch: Input schema "STRING" must be a struct, an array or a map.

My desire result is to have column dsud_answers as JSON datatype so I can flatten content of if. In this case there will be 2 records, because JSON contains 2 QuestionIds.

I managed to transform it in Pandas but cannot figure our Pyspark way to do it. I tranformed Pyspark dataframe to Pandas dataframe and then looped through all columns.

from pyspark.sql.types import StructField, IntegerType, TimestampType, BooleanType

def batch_function (df_answers, batch_id):
    

    df = df_answers.select("*").filter(df_bgx_answers.dsud_answers != '[]')
         .withColumn("ud_modified_date", to_timestamp(df_bgx_answers.ud_modified_date))
         .drop_duplicates()
         .toPandas()


    df_attributes = pd.DataFrame()
    df_final = pd.DataFrame()
    # Loop through the data to fill the dataframe
    for index in df.index:

        indexId = df.dsud_id[index]
        userDataId = df.dsud_user_data_id[index]
        dynamicStepId = df.dsud_dynamic_step_id[index]
        languageID = df.ud_language[index]
        createdDate = to_datetime(df.ud_created_date[index])
        createdBy = df.ud_created_by_id[index]
        modifiedDate = df.ud_modified_date[index]
        email = df.user_email[index]
        workflowId = df.ud_standard_workflow_id[index]
        uid = df.user_uid[index]
        completed = df.wrn_is_completed[index]
        agreed = df.wrn_is_agreed[index]
        flow_name = df.wf_name[index]

        row_json = json.loads(df.dsud_answers[index])
        normalized_row = pd.json_normalize(row_json)
        
        df_attributes = pd.concat([df_attributes, normalized_row], ignore_index=True) 
        df_attributes['dsud_user_data_id'] = userDataId
        df_attributes['dsud_id'] = indexId
        df_attributes['dsud_dynamic_step_id'] = dynamicStepId
        df_attributes['ud_language'] = languageID
        df_attributes['ud_created_date'] = createdDate
        df_attributes['ud_created_by_id'] = createdBy
        df_attributes['ud_modified_date'] = modifiedDate
        df_attributes['user_email'] = email
        df_attributes['ud_standard_workflow_id'] = workflowId
        df_attributes['user_uid'] = uid
        df_attributes['wrn_is_completed'] = completed
        df_attributes['wrn_is_agreed'] = agreed
        df_attributes['wf_name'] = flow_name

        df_attributes = df_attributes.reset_index(drop=True)
        df_final = pd.concat([df_final, df_attributes])

    df_answers = spark.createDataFrame(df_final)

    df_answers.write.mode("append").format("delta").saveAsTable("final_table")  

Solution

  • To transform the dsud_answers column from a string to a JSON data type in PySpark

    df.printSchema()
    root
     |-- dsud_answers: string (nullable = true)
     |-- dsud_dynamic_step_id: long (nullable = true)
     |-- dsud_id: long (nullable = true)
     |-- dsud_is_completed: boolean (nullable = true)
     |-- dsud_user_data_id: long (nullable = true)
     |-- ud_created_by_id: string (nullable = true)
     |-- ud_created_date: string (nullable = true)
     |-- ud_id: long (nullable = true)
     |-- ud_is_completed: boolean (nullable = true)
     |-- ud_is_preview: boolean (nullable = true)
     |-- ud_language: string (nullable = true)
     |-- ud_modified_date: string (nullable = true)
     |-- ud_standard_workflow_id: long (nullable = true)
     |-- user_email: string (nullable = true)
     |-- user_uid: string (nullable = true)
    

    I have tried the below appoach:

    from pyspark.sql.functions import col, from_json
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    from pyspark.sql.types import ArrayType
    schema = ArrayType(StructType([
        StructField("QuestionId", IntegerType()),
        StructField("QuestionTitle", StringType()),
        StructField("Value", StringType())
    ]))
    df = df.withColumn("dsud_answers", from_json(col("dsud_answers"), schema))
    df.display(truncate=False)
    

    Results:

    dsud_answers    dsud_dynamic_step_id    dsud_id dsud_is_completed   dsud_user_data_id   ud_created_by_id    ud_created_date ud_id   ud_is_completed ud_is_preview   ud_language ud_modified_date    ud_standard_workflow_id user_email  user_uid
    [{"QuestionId":6406,"QuestionTitle":"Residency","Value":"1975"},{"QuestionId":6407,"QuestionTitle":"Citizenship","Value":"66664"}]  201 100 false   101 user1   2022-01-01 10:00:00 1   false   true    en  2022-01-02 09:00:00 1001    [email protected]   12345
    [{"QuestionId":6408,"QuestionTitle":"Education","Value":"Bachelor"},{"QuestionId":6409,"QuestionTitle":"Occupation","Value":"Engineer"}]    202 200 true    201 user2   2022-01-03 14:00:00 2   true    false   es  2022-01-04 10:00:00 1002    [email protected]   67890
    

    enter image description here

    In the above code, I am applying the schema and converting the dsud_answers column to JSON datatype. I am defining a schema for the JSON data. The JSON is expected to represent an array of objects, each containing three fields: QuestionId (Integer), QuestionTitle (String), and Value (String). Using the withColumn method, I transform the DataFrame (df) by parsing the JSON column named "dsud_answers" into a structured format defined by the schema. The result is a new column named "dsud_answers" containing the parsed JSON data.