Search code examples
apache-sparkpysparkapache-spark-sqlspark-streamingspark-structured-streaming

PySpark Dataframe Transformation pyspark


I have a below dataframe which i need to transform as below.

i am using PySpark 3.4.1 .

+-----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ID                     |SH                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
+-----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|64ecb76b5e6452.63046706|[{"AJ":0,"BW":888300,"CA":0,"CF":0,"DT":25,"EP":378368,"ML":76.0,"FP":{"AD":44098,"DD":46,"NA":44098,"NW":46},"NJ":{"FD":26,"PL":6,"TH":0.0}},{"AJ":0,"BW":887749,"CA":0,"CF":0,"DT":25,"EP":438380,"ML":72.0,"FP":{"AD":44106,"DD":46,"NA":44099,"NW":46},"NJ":{"FD":25,"PL":5,"TH":0.0}},{"AJ":0,"BW":870441,"CA":0,"CF":0,"DT":25,"EP":498392,"ML":67.0,"FP":{"AD":44098,"DD":46,"NA":44098,"NW":46},"NJ":{"FD":26,"PL":6,"TH":0.0}},{"AJ":0,"BW":875936,"CA":0,"CF":0,"DT":25,"EP":558404,"ML":74.0,"FP":{"AD":44099,"DD":46,"NA":44099,"NW":46},"NJ":{"FD":25,"PL":5,"TH":0.0}},{"AJ":0,"BW":878518,"CA":0,"CF":0,"DT":23,"EP":618417,"IL":62.288616,"ML":82.0,"FP":{"AD":44105,"DD":49,"NA":44105,"NW":49},"NJ":{"FD":25,"PL":5,"TH":0.0}}]|
+-----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

i am creating this dataframe (spark streaming job) after consuming from Kafka and using json_get_object function to get ID and SH from json object

now i need to transfomrm as below.

+----+-----------------------+----+-----+--------+-----+-----+-----+-------+--------+--------+--------+--------+--------+--------+--------+
|EN  |ID                     |EP  |SH_AJ|SH_BW   |SH_CA|SH_CF|SH_DT|SH_EP  |SH_FP_AD|SH_FP_DD|SH_FP_NA|SH_FP_NW|SH_NJ_FD|SH_NJ_PL|SH_NJ_TH|
+----+-----------------------+----+-----+--------+-----+-----+-----+-------+--------+--------+--------+--------+--------+--------+--------+
|null|64ecb5227ecfe0.13927687|null|0.0  |674969.0|0.0  |0.0  |25.0 |78727.0|44099.0 |45.0    |44099.0 |45.0    |21.0    |1.0     |0.0     |
+----+-----------------------+----+-----+--------+-----+-----+-----+-------+--------+--------+--------+--------+--------+--------+--------+
|null|64ecb5227ecfe0.13927687|null|0.0  |674969.0|0.0  |0.0  |25.0 |78727.0|44099.0 |45.0    |44099.0 |45.0    |21.0    |1.0     |0.0     |
+----+-----------------------+----+-----+--------+-----+-----+-----+-------+--------+--------+--------+--------+--------+--------+--------+
|null|64ecb5227ecfe0.13927687|null|0.0  |674969.0|0.0  |0.0  |25.0 |78727.0|44099.0 |45.0    |44099.0 |45.0    |21.0    |1.0     |0.0     |
+----+-----------------------+----+-----+--------+-----+-----+-----+-------+--------+--------+--------+--------+--------+--------+--------+

i have tried explode and explode_outer tried other stuff as well but i am not able to explode json array.

this is the acutal JSON whhich i am getting from Kafka.

{
   "ID":"64ecb76b5e6452.63046706",
   "SH":[
      {
         "AJ":0,
         "BW":888300,
         "CA":0,
         "CF":0,
         "DT":25,
         "EP":378368,
         "ML":76.000000,
         "FP":{
            "AD":44098,
            "DD":46,
            "NA":44098,
            "NW":46
         },
         "NJ":{
            "FD":26,
            "PL":6,
            "TH":0.000000
         }
      },
      {
         "AJ":0,
         "BW":887749,
         "CA":0,
         "CF":0,
         "DT":25,
         "EP":438380,
         "ML":72.000000,
         "FP":{
            "AD":44106,
            "DD":46,
            "NA":44099,
            "NW":46
         },
         "NJ":{
            "FD":25,
            "PL":5,
            "TH":0.000000
         }
      },
      {
         "AJ":0,
         "BW":870441,
         "CA":0,
         "CF":0,
         "DT":25,
         "EP":498392,
         "ML":67.000000,
         "FP":{
            "AD":44098,
            "DD":46,
            "NA":44098,
            "NW":46
         },
         "NJ":{
            "FD":26,
            "PL":6,
            "TH":0.000000
         }
      },
      {
         "AJ":0,
         "BW":875936,
         "CA":0,
         "CF":0,
         "DT":25,
         "EP":558404,
         "ML":74.000000,
         "FP":{
            "AD":44099,
            "DD":46,
            "NA":44099,
            "NW":46
         },
         "NJ":{
            "FD":25,
            "PL":5,
            "TH":0.000000
         }
      },
      {
         "AJ":0,
         "BW":878518,
         "CA":0,
         "CF":0,
         "DT":23,
         "EP":618417,
         "IL":62.288616,
         "ML":82.000000,
         "FP":{
            "AD":44105,
            "DD":49,
            "NA":44105,
            "NW":49
         },
         "NJ":{
            "FD":25,
            "PL":5,
            "TH":0.000000
         }
      }
   ]
}

Solution

  • Current situation

    The schema of your dataframe seems to be only strings.

    df.show()
    
    +--------------------+--------------------+
    |                  ID|                  SH|
    +--------------------+--------------------+
    |64ecb76b5e6452.63...|[{"AJ":0,"BW":888...|
    +--------------------+--------------------+
    
    
    df.printSchema()
    
    root
     |-- ID: string (nullable = true)
     |-- SH: string (nullable = true)
    

    Therefore, you need to input a schema to be able to do anything with your data.

    Definition of the schema

    from pyspark.sql import functions as F, types as T
    
    schema = T.ArrayType(
        T.StructType(
            [
                T.StructField("AJ", T.IntegerType()),
                T.StructField("BW", T.IntegerType()),
                T.StructField("CA", T.IntegerType()),
                T.StructField("CF", T.IntegerType()),
                T.StructField("DT", T.IntegerType()),
                T.StructField("EP", T.IntegerType()),
                T.StructField("IL", T.FloatType()),
                T.StructField("ML", T.FloatType()),
                T.StructField(
                    "FP",
                    T.StructType(
                        [
                            T.StructField("AD", T.IntegerType()),
                            T.StructField("DD", T.IntegerType()),
                            T.StructField("NA", T.IntegerType()),
                            T.StructField("NW", T.IntegerType()),
                        ]
                    ),
                ),
                T.StructField(
                    "NJ",
                    T.StructType(
                        [
                            T.StructField("FD", T.IntegerType()),
                            T.StructField("PL", T.IntegerType()),
                            T.StructField("TH", T.FloatType()),
                        ]
                    ),
                ),
            ]
        )
    )
    

    Parse JSON string with the schema

    df.withColumn("SH", F.explode(F.from_json("SH", schema))).select("ID", "SH.*").show()
    
    +--------------------+---+------+---+---+---+------+---------+----+--------------------+------------+
    |                  ID| AJ|    BW| CA| CF| DT|    EP|       IL|  ML|                  FP|          NJ|
    +--------------------+---+------+---+---+---+------+---------+----+--------------------+------------+
    |64ecb76b5e6452.63...|  0|888300|  0|  0| 25|378368|     null|76.0|{44098, 46, 44098...|{26, 6, 0.0}|
    |64ecb76b5e6452.63...|  0|887749|  0|  0| 25|438380|     null|72.0|{44106, 46, 44099...|{25, 5, 0.0}|
    |64ecb76b5e6452.63...|  0|870441|  0|  0| 25|498392|     null|67.0|{44098, 46, 44098...|{26, 6, 0.0}|
    |64ecb76b5e6452.63...|  0|875936|  0|  0| 25|558404|     null|74.0|{44099, 46, 44099...|{25, 5, 0.0}|
    |64ecb76b5e6452.63...|  0|878518|  0|  0| 23|618417|62.288616|82.0|{44105, 49, 44105...|{25, 5, 0.0}|
    +--------------------+---+------+---+---+---+------+---------+----+--------------------+------------+