Search code examples

PySpark Dataframe Transformation pyspark

I have a below dataframe which i need to transform as below.

i am using PySpark 3.4.1 .

|ID                     |SH                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |

i am creating this dataframe (spark streaming job) after consuming from Kafka and using json_get_object function to get ID and SH from json object

now i need to transfomrm as below.

|null|64ecb5227ecfe0.13927687|null|0.0  |674969.0|0.0  |0.0  |25.0 |78727.0|44099.0 |45.0    |44099.0 |45.0    |21.0    |1.0     |0.0     |
|null|64ecb5227ecfe0.13927687|null|0.0  |674969.0|0.0  |0.0  |25.0 |78727.0|44099.0 |45.0    |44099.0 |45.0    |21.0    |1.0     |0.0     |
|null|64ecb5227ecfe0.13927687|null|0.0  |674969.0|0.0  |0.0  |25.0 |78727.0|44099.0 |45.0    |44099.0 |45.0    |21.0    |1.0     |0.0     |

i have tried explode and explode_outer tried other stuff as well but i am not able to explode json array.

this is the acutal JSON whhich i am getting from Kafka.



  • Current situation

    The schema of your dataframe seems to be only strings.
    |                  ID|                  SH|
     |-- ID: string (nullable = true)
     |-- SH: string (nullable = true)

    Therefore, you need to input a schema to be able to do anything with your data.

    Definition of the schema

    from pyspark.sql import functions as F, types as T
    schema = T.ArrayType(
                T.StructField("AJ", T.IntegerType()),
                T.StructField("BW", T.IntegerType()),
                T.StructField("CA", T.IntegerType()),
                T.StructField("CF", T.IntegerType()),
                T.StructField("DT", T.IntegerType()),
                T.StructField("EP", T.IntegerType()),
                T.StructField("IL", T.FloatType()),
                T.StructField("ML", T.FloatType()),
                            T.StructField("AD", T.IntegerType()),
                            T.StructField("DD", T.IntegerType()),
                            T.StructField("NA", T.IntegerType()),
                            T.StructField("NW", T.IntegerType()),
                            T.StructField("FD", T.IntegerType()),
                            T.StructField("PL", T.IntegerType()),
                            T.StructField("TH", T.FloatType()),

    Parse JSON string with the schema

    df.withColumn("SH", F.explode(F.from_json("SH", schema))).select("ID", "SH.*").show()
    |                  ID| AJ|    BW| CA| CF| DT|    EP|       IL|  ML|                  FP|          NJ|
    |64ecb76b5e6452.63...|  0|888300|  0|  0| 25|378368|     null|76.0|{44098, 46, 44098...|{26, 6, 0.0}|
    |64ecb76b5e6452.63...|  0|887749|  0|  0| 25|438380|     null|72.0|{44106, 46, 44099...|{25, 5, 0.0}|
    |64ecb76b5e6452.63...|  0|870441|  0|  0| 25|498392|     null|67.0|{44098, 46, 44098...|{26, 6, 0.0}|
    |64ecb76b5e6452.63...|  0|875936|  0|  0| 25|558404|     null|74.0|{44099, 46, 44099...|{25, 5, 0.0}|
    |64ecb76b5e6452.63...|  0|878518|  0|  0| 23|618417|62.288616|82.0|{44105, 49, 44105...|{25, 5, 0.0}|