Search code examples
jsonapache-sparkpysparkapache-spark-sqlmultiple-columns

How to expand dataframe column of JSON string type to rows and columns


A column inside my table contains an array JSON object (JSON string): one JSON presents a timestamp. The data is a record which the format is presented by sheet and the related data ("param_value") is a array JSON. The "param_value" contains parameters' value for each timestamp. And I want to transform it to by 'sheet', 'equipment', and 'point'. I have referred to this post already. But I can't use '*' to select the all the schema to expand. I can't sure the schema because this work is an ETL job. And it show that I need to use structType to build the schema.

The table looks like:

sheet equip param_value
a1 E1 [{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3', 'status':'ok','log':'ok'}]
a2 E1 [{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3', 'status':'ok','log':'ok'}]
a3 E1 [{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3', 'status':'ok','log':'ok'}]

The expected result:

sheet equipment point status log
a1 E1 1 no no
a1 E1 2 ok no
a1 E1 3 ok ok
a2 E1 1 no no
a2 E1 2 ok no
a2 E1 3 ok ok
a3 E1 1 no no
a3 E1 2 ok no
a3 E1 3 ok ok

Update:

As ZygD suggestion, we can define schema as a DDL-formatted string. Hence I successfully create the schema for multiple columns and no need to know each schema names. However, I need a list which contains the schema. Is there a way to extract the schema from the array json? ( schema in each json are the same)

schema_list = ['point', 'status','log'] # -> need to extract from the array json
schema = 'array<struct<'
for c in schema_list:
    string_to_add = ',' + c +':string'
    schema = schema + string_to_add
schema = schema.replace(",", "", 1)+'>>'
s = "'" + schema + "'"
print(s)  # 'array<struct<point:string,status:string,log:string>>'
f = df.selectExpr("sheet", "equip", f"inline(from_json(param_value, {s}))")

f.show()
+-----+-----+-----+------+---+
|sheet|equip|point|status|log|
+-----+-----+-----+------+---+
|   a1|   E1|    1|    no| no|
|   a1|   E1|    2|    ok| no|
|   a1|   E1|    3|    ok| ok|
|   a2|   E1|    1|    no| no|
|   a2|   E1|    2|    ok| no|
|   a2|   E1|    3|    ok| ok|
|   a3|   E1|    1|    no| no|
|   a3|   E1|    2|    ok| no|
|   a3|   E1|    3|    ok| ok|
+-----+-----+-----+------+---+

Solution

  • To extract data from a JSON string you will need to use from_json for which you will need to provide the schema. If you define the schema as a DDL-formatted string, then you can use it inside inline which quickly extracts array of structs into columns.

    Input dataframe:

    df = spark.createDataFrame(
        [('a1', 'E1', "[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]"),
         ('a2', 'E1', "[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]"),
         ('a3', 'E1', "[{'point':'1','status':'no','log':'no'},{'point':'2','status':'ok','log':'no'},{'point':'3','status':'ok','log':'ok'}]")],
        ['sheet', 'equip', 'param_value'])
    

    Script:

    schema = "array<struct<point:string,status:string,log:string>>"
    df = df.selectExpr("sheet", "equip", f"inline(from_json(param_value, '{schema}'))")
    
    df.show()
    # +-----+-----+-----+------+---+
    # |sheet|equip|point|status|log|
    # +-----+-----+-----+------+---+
    # |   a1|   E1|    1|    no| no|
    # |   a1|   E1|    2|    ok| no|
    # |   a1|   E1|    3|    ok| ok|
    # |   a2|   E1|    1|    no| no|
    # |   a2|   E1|    2|    ok| no|
    # |   a2|   E1|    3|    ok| ok|
    # |   a3|   E1|    1|    no| no|
    # |   a3|   E1|    2|    ok| no|
    # |   a3|   E1|    3|    ok| ok|
    # +-----+-----+-----+------+---+
    

    If you prefer not to provide schema, you can infer it, but it's less efficient:

    schema = f"array<{spark.read.json(df.rdd.map(lambda r: r.param_value)).schema.simpleString()}>"
    df = df.selectExpr("sheet", "equip", f"inline(from_json(param_value, '{schema}'))")
    
    df.show()
    # +-----+-----+---+-----+------+
    # |sheet|equip|log|point|status|
    # +-----+-----+---+-----+------+
    # |   a1|   E1| no|    1|    no|
    # |   a1|   E1| no|    2|    ok|
    # |   a1|   E1| ok|    3|    ok|
    # |   a2|   E1| no|    1|    no|
    # |   a2|   E1| no|    2|    ok|
    # |   a2|   E1| ok|    3|    ok|
    # |   a3|   E1| no|    1|    no|
    # |   a3|   E1| no|    2|    ok|
    # |   a3|   E1| ok|    3|    ok|
    # +-----+-----+---+-----+------+