i have below JSON which i am reading from Kafka and then tyring to convert to StructType using from_json function.
schema_session_start = StructType([
StructField("ID", StringType()),
StructField("SID", StringType()),
StructField("EP", LongType()),
StructField("IP", StringType()),
StructField("LN", StringType()),
StructField("VN", StringType()),
StructField("DV", StructType([
StructField("MK", StringType()),
StructField("MDL", StringType()),
StructField("OS", StringType()),
StructField("OSVN", StringType()),
StructField("AR", StringType())
])),
StructField("MC", StringType()),
StructField("FN", StringType()),
StructField("NW", StructType([
StructField("TP", StringType())
])),
StructField("AL", StringType()),
StructField("EN", StringType())
])
value | EN |
---|---|
{"ID":"651551912131b2.07017577","SID":"169156360280217644","EP":1695895952305,"IP":"10.10.10.10","LN":"","VN":"2.4.0.0","DV":{"MK":"Jio","MDL":"JHSD200","OS":"JioOS 2","OSVN":"9","AR":"armeabi-v7a"},"MC":"02:00:00:00:00:00","FN":true,"NW":"TP":"wifi_5"},"AL":"GRIPdemo","EN":"Session_Start"} | Session_Start |
array_df = condition_df.withColumn("value_json",from_json(col("value"),when(condition_df.EN == "Session_Start", schema_session_start)))
I am getting below error when i am trying to convert :
ERROR:root:An error occurred: 'StructField' object has no attribute '_get_object_id'
The second argument of the from_json
should be either string with schema or StructType
(see docs), but in your case it's a Column
. If you want to apply a specific schema only to a given event type, then you need to do it differently - move when
outside of from_json
, something like this:
array_df = condition_df.withColumn("value_json",
when(condition_df.EN == "Session_Start",
from_json(col("value"), schema_session_start)))