I started to learn PySpark, but now I'm stuck with transforming a JSON document from within a dataframe, in contrast to my sample data the initial dataframe has more than 2 rows.
My initial dataframe is here:
df = spark.createDataFrame(["A", "B"], StringType()).toDF("id")
display(df)
The function I call from within the dataframe looks like this:
def getObjectInformation(id):
normalized_data = dict()
theJSONresponse = requests.get("https://someone.somewhere/" + id).json()['value']
theJSONresponse_dumps = json.dumps(theJSONresponse)
normalized_data["_data"] = theJSONresponse_dumps
return normalized_data["_data"]
udf_getObjectInformation = udf(lambda x: getObjectInformation(x))
I call the function from within the dataframe:
df_oid = df.select('id').withColumn('oid', udf_getObjectInformation(df.id))
These are the JSON documents for id=A and id=B
#normalized_data["_data"] for id = A
[{"oid": "1", "id": "A", "type": "this", "oDetails": "{\"c\":\"blue\",\"p\":\"fruit\"}"},
{"oid": "2", "id": "A", "type": "this", "oDetails": "{\"c\":\"red\",\"p\":\"book\"}"},
{"oid": "3", "id": "A", "type": "that", "oDetails": "{\"c\":\"green\",\"p\":\"book\"}"}]
#normalized_data["_data"] for id=B
[{"oid": "57", "id": "B", "type": "this", "oDetails": "{\"c\":\"blue\",\"p\":\"fruit\"}"},
{"oid": "59", "id": "B", "type": "that", "oDetails": "{\"c\":\"blue\",\"p\":\"shirt\"}"}]
Now, my struggle begins ...
I want my final dataframe to be like this:
data = [
("A","1","this","blue","fruit"),
("A","2","this","red","book"),
("A","3","this","green" ,"book"),
("B","57","this","blue","fruit"),
("B","59","something","blue", "shirt")
]
schema = StructType([ \
StructField("id",StringType(),True), \
StructField("oid",StringType(),True), \
StructField("type",StringType(),True), \
StructField("c", StringType(), True), \
StructField("p", StringType(), True) \
])
df_final = spark.createDataFrame(data=data,schema=schema)
Any hint, guidance, solution is much appreciated.
Your input is a JSON string that contains another JSON. You can parse the JSONs and apply the necessary transformations, pyspark>=3.4
example:
from pyspark.sql import functions as F
from pyspark.sql import types as T
raw_json = '[{"oid": "1", "id": "A", "type": "this", "oDetails": "{\\"c\\":\\"blue\\",\\"p\\":\\"fruit\\"}"}, {"oid": "2", "id": "A", "type": "this", "oDetails": "{\\"c\\":\\"red\\",\\"p\\":\\"book\\"}"}, {"oid": "3", "id": "A", "type": "that", "oDetails": "{\\"c\\":\\"green\\",\\"p\\":\\"book\\"}"}]'
df = spark.createDataFrame([(raw_json, ),], ['json_col'])
df.show(1)
# +--------------------+
# | json_col|
# +--------------------+
# |[{"oid": "1", "id...|
# +--------------------+
inner_struct_schema = T.StructType([
T.StructField('id', T.StringType(), True),
T.StructField('oDetails', T.StringType(), True),
T.StructField('oid', T.StringType(), True),
T.StructField('type', T.StringType(), True)
])
json_schema = T.ArrayType(inner_struct_schema)
parsed_struct = F.from_json('json_col', json_schema)
df2 = df.select(F.inline(parsed_struct))
df2.show(10, False)
# +---+------------------------+---+----+
# |id |oDetails |oid|type|
# +---+------------------------+---+----+
# |A |{"c":"blue","p":"fruit"}|1 |this|
# |A |{"c":"red","p":"book"} |2 |this|
# |A |{"c":"green","p":"book"}|3 |that|
# +---+------------------------+---+----+
odetails_schema = T.StructType([
T.StructField('c', T.StringType(), True),
T.StructField('p', T.StringType(), True),
])
parsed_detail = F.from_json('oDetails', odetails_schema)
df3 = df2.select(
F.col('id'),
F.col('oid'),
F.col('type'),
parsed_detail.getField('c').alias('c'),
parsed_detail.getField('p').alias('p'),
)
df3.show(10, False)
# +---+---+----+-----+-----+
# |id |oid|type|c |p |
# +---+---+----+-----+-----+
# |A |1 |this|blue |fruit|
# |A |2 |this|red |book |
# |A |3 |that|green|book |
# +---+---+----+-----+-----+