What I'm trying to accomplish is exactly what this question about (Here) however; In my case I'm using Python/Pyspark Not Scala.
I'm trying to extract "payload" part of Kafka connect message that include schema as well.
Sample message :
{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}
Step 1 - Defind schema for "payload" part:
payload_schema = StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)])
Step 2 - Read from Kafka :
df =spark.readStream.format("kafka")
Step 3 - get message value from Kafka message:
kafka_df = df.selectExpr("CAST(value AS STRING)")
Step 4 - Extract "payload" only (I'm stuck here):
import pyspark.sql.functions as psf
emp_df = kafka_df\
.select(psf.from_json(psf.col('value'), payload_schema).alias("DF"))\
.select("DF.*")
I'm stuck in this part as I couldn't figure out how to extract payload from JSON string before passing it to from_json() function.
Note : I'm aware of that I need to define the full schema for the entire message before I can make use of it in from_json(), however; I'm trying to get only "payload" json string part.
You could make use of the SQL function get_json_object
:
import pyspark.sql.functions as psf
kafka_df
.select(psf.get_json_object(kafka_df['value'],"$.payload").alias('payload'))
.select(psf.from_json(psf.col('payload'), payload_schema).alias("DF"))
.select("DF.*")
Or, you need to define the full schema for the entire message before you can make use of it in from_json
.
That means your schema should rather look like below:
full_schema = StructType([
StructField("schema", StructType([
StructField("type", StringType(), False),
StructField("name", StringType(), False),
StructField("fields", StructType([
StructField("field", StringType(), False),
StructField("type", StringType(), False)
]),
StructField("payload", StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)
])
])
Please double check this schema definition as I am not entirely sure how to define an Array within a schema in Python, but I hope the idea is clear.
As soon as this is done, you can select the payload fields by
import pyspark.sql.functions as psf
emp_df = kafka_df\
.select(psf.from_json(psf.col('value'), full_schema).alias("DF"))\
.select("DF.payload.*")