Search code examples
apache-sparkpyspark

Combine fields in a nested json file into a dataframe


I have this code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, explode, schema_of_json, lit

spark = SparkSession.builder.getOrCreate()

s = '{"job_id":"123","settings":{"task":[{"taskname":"task1","notebook_task":{"notebook_path":"path1"}},{"taskname":"task2","notebook_task":{"notebook_path":"path2"}}]}}'

schema = schema_of_json(lit(s))

result_df = (
    spark.createDataFrame([s], "string")
    .select(from_json(col("value"), schema).alias("data"))
    .select("data.job_id", explode("data.settings.task.taskname").alias("taskname"))
    )

result_df.show()

Which generates this:

+------+--------+
|job_id|taskname|
+------+--------+
|   123|   task1|
|   123|   task2|
+------+--------+

How to add the field notebook_path to the dataframe? It seems explode can't generate more than one field, and can't put two explode function in the select. I know I can create addition dataframes (job_id, notebook_path) and then join these two dataframes on job_id. Just wondering any better solution available?


Solution

  • You should explode the array which is task not taskname. Then you can access the fields of the exploded objects as necessary.

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    
    spark = SparkSession.builder.getOrCreate()
    
    s = '{"job_id":"123","settings":{"task":[{"taskname":"task1","notebook_task":{"notebook_path":"path1"}},{"taskname":"task2","notebook_task":{"notebook_path":"path2"}}]}}'
    
    schema = F.schema_of_json(F.lit(s))
    df = (
        spark.createDataFrame([s], 'string')
        .select(F.from_json(F.col('value'), schema).alias('data'))
    )
    
    cols = [
        F.col('data').getField('job_id').alias('job_id'),
        F.explode('data.settings.task').alias('task'),
    ]
    df2 = df.select(cols)
    
    final_cols = [
        F.col('job_id'),
        F.col('task.taskname').alias('taskname'),
        F.col('task.notebook_task.notebook_path').alias('notebook_path'),
    ]
    result_df = df2.select(final_cols)
    result_df.show(10, False)
    
    # +------+--------+-------------+
    # |job_id|taskname|notebook_path|
    # +------+--------+-------------+
    # |123   |task1   |path1        |
    # |123   |task2   |path2        |
    # +------+--------+-------------+