I have a below dataframe which i need to transform as below.
i am using PySpark 3.4.1 .
+-----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ID |SH |
+-----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|64ecb76b5e6452.63046706|[{"AJ":0,"BW":888300,"CA":0,"CF":0,"DT":25,"EP":378368,"ML":76.0,"FP":{"AD":44098,"DD":46,"NA":44098,"NW":46},"NJ":{"FD":26,"PL":6,"TH":0.0}},{"AJ":0,"BW":887749,"CA":0,"CF":0,"DT":25,"EP":438380,"ML":72.0,"FP":{"AD":44106,"DD":46,"NA":44099,"NW":46},"NJ":{"FD":25,"PL":5,"TH":0.0}},{"AJ":0,"BW":870441,"CA":0,"CF":0,"DT":25,"EP":498392,"ML":67.0,"FP":{"AD":44098,"DD":46,"NA":44098,"NW":46},"NJ":{"FD":26,"PL":6,"TH":0.0}},{"AJ":0,"BW":875936,"CA":0,"CF":0,"DT":25,"EP":558404,"ML":74.0,"FP":{"AD":44099,"DD":46,"NA":44099,"NW":46},"NJ":{"FD":25,"PL":5,"TH":0.0}},{"AJ":0,"BW":878518,"CA":0,"CF":0,"DT":23,"EP":618417,"IL":62.288616,"ML":82.0,"FP":{"AD":44105,"DD":49,"NA":44105,"NW":49},"NJ":{"FD":25,"PL":5,"TH":0.0}}]|
+-----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
i am creating this dataframe (spark streaming job) after consuming from Kafka and using json_get_object function to get ID and SH from json object
now i need to transfomrm as below.
+----+-----------------------+----+-----+--------+-----+-----+-----+-------+--------+--------+--------+--------+--------+--------+--------+
|EN |ID |EP |SH_AJ|SH_BW |SH_CA|SH_CF|SH_DT|SH_EP |SH_FP_AD|SH_FP_DD|SH_FP_NA|SH_FP_NW|SH_NJ_FD|SH_NJ_PL|SH_NJ_TH|
+----+-----------------------+----+-----+--------+-----+-----+-----+-------+--------+--------+--------+--------+--------+--------+--------+
|null|64ecb5227ecfe0.13927687|null|0.0 |674969.0|0.0 |0.0 |25.0 |78727.0|44099.0 |45.0 |44099.0 |45.0 |21.0 |1.0 |0.0 |
+----+-----------------------+----+-----+--------+-----+-----+-----+-------+--------+--------+--------+--------+--------+--------+--------+
|null|64ecb5227ecfe0.13927687|null|0.0 |674969.0|0.0 |0.0 |25.0 |78727.0|44099.0 |45.0 |44099.0 |45.0 |21.0 |1.0 |0.0 |
+----+-----------------------+----+-----+--------+-----+-----+-----+-------+--------+--------+--------+--------+--------+--------+--------+
|null|64ecb5227ecfe0.13927687|null|0.0 |674969.0|0.0 |0.0 |25.0 |78727.0|44099.0 |45.0 |44099.0 |45.0 |21.0 |1.0 |0.0 |
+----+-----------------------+----+-----+--------+-----+-----+-----+-------+--------+--------+--------+--------+--------+--------+--------+
i have tried explode and explode_outer tried other stuff as well but i am not able to explode json array.
this is the acutal JSON whhich i am getting from Kafka.
{
"ID":"64ecb76b5e6452.63046706",
"SH":[
{
"AJ":0,
"BW":888300,
"CA":0,
"CF":0,
"DT":25,
"EP":378368,
"ML":76.000000,
"FP":{
"AD":44098,
"DD":46,
"NA":44098,
"NW":46
},
"NJ":{
"FD":26,
"PL":6,
"TH":0.000000
}
},
{
"AJ":0,
"BW":887749,
"CA":0,
"CF":0,
"DT":25,
"EP":438380,
"ML":72.000000,
"FP":{
"AD":44106,
"DD":46,
"NA":44099,
"NW":46
},
"NJ":{
"FD":25,
"PL":5,
"TH":0.000000
}
},
{
"AJ":0,
"BW":870441,
"CA":0,
"CF":0,
"DT":25,
"EP":498392,
"ML":67.000000,
"FP":{
"AD":44098,
"DD":46,
"NA":44098,
"NW":46
},
"NJ":{
"FD":26,
"PL":6,
"TH":0.000000
}
},
{
"AJ":0,
"BW":875936,
"CA":0,
"CF":0,
"DT":25,
"EP":558404,
"ML":74.000000,
"FP":{
"AD":44099,
"DD":46,
"NA":44099,
"NW":46
},
"NJ":{
"FD":25,
"PL":5,
"TH":0.000000
}
},
{
"AJ":0,
"BW":878518,
"CA":0,
"CF":0,
"DT":23,
"EP":618417,
"IL":62.288616,
"ML":82.000000,
"FP":{
"AD":44105,
"DD":49,
"NA":44105,
"NW":49
},
"NJ":{
"FD":25,
"PL":5,
"TH":0.000000
}
}
]
}
The schema of your dataframe seems to be only strings.
df.show()
+--------------------+--------------------+
| ID| SH|
+--------------------+--------------------+
|64ecb76b5e6452.63...|[{"AJ":0,"BW":888...|
+--------------------+--------------------+
df.printSchema()
root
|-- ID: string (nullable = true)
|-- SH: string (nullable = true)
Therefore, you need to input a schema to be able to do anything with your data.
from pyspark.sql import functions as F, types as T
schema = T.ArrayType(
T.StructType(
[
T.StructField("AJ", T.IntegerType()),
T.StructField("BW", T.IntegerType()),
T.StructField("CA", T.IntegerType()),
T.StructField("CF", T.IntegerType()),
T.StructField("DT", T.IntegerType()),
T.StructField("EP", T.IntegerType()),
T.StructField("IL", T.FloatType()),
T.StructField("ML", T.FloatType()),
T.StructField(
"FP",
T.StructType(
[
T.StructField("AD", T.IntegerType()),
T.StructField("DD", T.IntegerType()),
T.StructField("NA", T.IntegerType()),
T.StructField("NW", T.IntegerType()),
]
),
),
T.StructField(
"NJ",
T.StructType(
[
T.StructField("FD", T.IntegerType()),
T.StructField("PL", T.IntegerType()),
T.StructField("TH", T.FloatType()),
]
),
),
]
)
)
df.withColumn("SH", F.explode(F.from_json("SH", schema))).select("ID", "SH.*").show()
+--------------------+---+------+---+---+---+------+---------+----+--------------------+------------+
| ID| AJ| BW| CA| CF| DT| EP| IL| ML| FP| NJ|
+--------------------+---+------+---+---+---+------+---------+----+--------------------+------------+
|64ecb76b5e6452.63...| 0|888300| 0| 0| 25|378368| null|76.0|{44098, 46, 44098...|{26, 6, 0.0}|
|64ecb76b5e6452.63...| 0|887749| 0| 0| 25|438380| null|72.0|{44106, 46, 44099...|{25, 5, 0.0}|
|64ecb76b5e6452.63...| 0|870441| 0| 0| 25|498392| null|67.0|{44098, 46, 44098...|{26, 6, 0.0}|
|64ecb76b5e6452.63...| 0|875936| 0| 0| 25|558404| null|74.0|{44099, 46, 44099...|{25, 5, 0.0}|
|64ecb76b5e6452.63...| 0|878518| 0| 0| 23|618417|62.288616|82.0|{44105, 49, 44105...|{25, 5, 0.0}|
+--------------------+---+------+---+---+---+------+---------+----+--------------------+------------+