Search code examples
apache-sparkpysparkapache-spark-sql

transform a json document from within a pyspark dataframe


I started to learn PySpark, but now I'm stuck with transforming a JSON document from within a dataframe, in contrast to my sample data the initial dataframe has more than 2 rows.

My initial dataframe is here:

df = spark.createDataFrame(["A", "B"], StringType()).toDF("id")
display(df)

The function I call from within the dataframe looks like this:

def getObjectInformation(id):
    normalized_data = dict()
    theJSONresponse = requests.get("https://someone.somewhere/" + id).json()['value']
    theJSONresponse_dumps = json.dumps(theJSONresponse)
    normalized_data["_data"] = theJSONresponse_dumps
    return normalized_data["_data"]

udf_getObjectInformation = udf(lambda x: getObjectInformation(x))

I call the function from within the dataframe:

df_oid = df.select('id').withColumn('oid', udf_getObjectInformation(df.id))

These are the JSON documents for id=A and id=B

#normalized_data["_data"] for id = A
[{"oid": "1", "id": "A", "type": "this", "oDetails": "{\"c\":\"blue\",\"p\":\"fruit\"}"}, 
{"oid": "2", "id": "A", "type": "this", "oDetails": "{\"c\":\"red\",\"p\":\"book\"}"}, 
{"oid": "3", "id": "A", "type": "that", "oDetails": "{\"c\":\"green\",\"p\":\"book\"}"}]

#normalized_data["_data"] for id=B
[{"oid": "57", "id": "B", "type": "this", "oDetails": "{\"c\":\"blue\",\"p\":\"fruit\"}"},  
{"oid": "59", "id": "B", "type": "that", "oDetails": "{\"c\":\"blue\",\"p\":\"shirt\"}"}]

Now, my struggle begins ...

I want my final dataframe to be like this:

data = [
    ("A","1","this","blue","fruit"),
    ("A","2","this","red","book"),
    ("A","3","this","green" ,"book"),
    ("B","57","this","blue","fruit"),
    ("B","59","something","blue", "shirt")
  ]

schema = StructType([ \
    StructField("id",StringType(),True), \
    StructField("oid",StringType(),True), \
    StructField("type",StringType(),True), \
    StructField("c", StringType(), True), \
    StructField("p", StringType(), True) \
  ])
 
df_final = spark.createDataFrame(data=data,schema=schema)

Any hint, guidance, solution is much appreciated.


Solution

  • Your input is a JSON string that contains another JSON. You can parse the JSONs and apply the necessary transformations, pyspark>=3.4 example:

    from pyspark.sql import functions as F
    from pyspark.sql import types as T
    
    raw_json = '[{"oid": "1", "id": "A", "type": "this", "oDetails": "{\\"c\\":\\"blue\\",\\"p\\":\\"fruit\\"}"}, {"oid": "2", "id": "A", "type": "this", "oDetails": "{\\"c\\":\\"red\\",\\"p\\":\\"book\\"}"}, {"oid": "3", "id": "A", "type": "that", "oDetails": "{\\"c\\":\\"green\\",\\"p\\":\\"book\\"}"}]'
    
    df = spark.createDataFrame([(raw_json, ),], ['json_col'])
    df.show(1)
    
    # +--------------------+
    # |            json_col|
    # +--------------------+
    # |[{"oid": "1", "id...|
    # +--------------------+
    
    inner_struct_schema = T.StructType([
        T.StructField('id', T.StringType(), True),
        T.StructField('oDetails', T.StringType(), True),
        T.StructField('oid', T.StringType(), True),
        T.StructField('type', T.StringType(), True)
    ])
    json_schema = T.ArrayType(inner_struct_schema)
    parsed_struct = F.from_json('json_col', json_schema)
    df2 = df.select(F.inline(parsed_struct))
    df2.show(10, False)
    
    # +---+------------------------+---+----+
    # |id |oDetails                |oid|type|
    # +---+------------------------+---+----+
    # |A  |{"c":"blue","p":"fruit"}|1  |this|
    # |A  |{"c":"red","p":"book"}  |2  |this|
    # |A  |{"c":"green","p":"book"}|3  |that|
    # +---+------------------------+---+----+
    
    odetails_schema = T.StructType([
        T.StructField('c', T.StringType(), True),
        T.StructField('p', T.StringType(), True),
    ])
    parsed_detail = F.from_json('oDetails', odetails_schema)
    
    df3 = df2.select(
        F.col('id'),
        F.col('oid'),
        F.col('type'),
        parsed_detail.getField('c').alias('c'),
        parsed_detail.getField('p').alias('p'),
    )
    df3.show(10, False)
    
    # +---+---+----+-----+-----+
    # |id |oid|type|c    |p    |
    # +---+---+----+-----+-----+
    # |A  |1  |this|blue |fruit|
    # |A  |2  |this|red  |book |
    # |A  |3  |that|green|book |
    # +---+---+----+-----+-----+