Search code examples
apache-sparkpysparkapache-spark-sqlspark-structured-streaming

PySpark Dataframe Transformation from_json


i have below JSON which i am reading from Kafka and then tyring to convert to StructType using from_json function.

schema_session_start = StructType([
    StructField("ID", StringType()),
    StructField("SID", StringType()),
    StructField("EP", LongType()),
    StructField("IP", StringType()),
    StructField("LN", StringType()),
    StructField("VN", StringType()),
    StructField("DV", StructType([
        StructField("MK", StringType()),
        StructField("MDL", StringType()),
        StructField("OS", StringType()),
        StructField("OSVN", StringType()),
        StructField("AR", StringType())
    ])),
    StructField("MC", StringType()),
    StructField("FN", StringType()),
    StructField("NW", StructType([
        StructField("TP", StringType())
    ])),
    StructField("AL", StringType()),
    StructField("EN", StringType())
])
value EN
{"ID":"651551912131b2.07017577","SID":"169156360280217644","EP":1695895952305,"IP":"10.10.10.10","LN":"","VN":"2.4.0.0","DV":{"MK":"Jio","MDL":"JHSD200","OS":"JioOS 2","OSVN":"9","AR":"armeabi-v7a"},"MC":"02:00:00:00:00:00","FN":true,"NW":"TP":"wifi_5"},"AL":"GRIPdemo","EN":"Session_Start"} Session_Start
array_df = condition_df.withColumn("value_json",from_json(col("value"),when(condition_df.EN == "Session_Start", schema_session_start)))

I am getting below error when i am trying to convert :

ERROR:root:An error occurred: 'StructField' object has no attribute '_get_object_id'


Solution

  • The second argument of the from_json should be either string with schema or StructType (see docs), but in your case it's a Column. If you want to apply a specific schema only to a given event type, then you need to do it differently - move when outside of from_json, something like this:

    array_df = condition_df.withColumn("value_json",
      when(condition_df.EN == "Session_Start",
         from_json(col("value"), schema_session_start)))