Search code examples
apache-sparkpysparkapache-kafkaspark-structured-streaming

Extracting "payload" from Kafka Connect JSONConverter messages with (schema & payload) using Spark Structured Streaming (pyspark)


What I'm trying to accomplish is exactly what this question about (Here) however; In my case I'm using Python/Pyspark Not Scala.

I'm trying to extract "payload" part of Kafka connect message that include schema as well.

Sample message :

{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}

Step 1 - Defind schema for "payload" part:

payload_schema = StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)])

Step 2 - Read from Kafka :

df =spark.readStream.format("kafka")

Step 3 - get message value from Kafka message:

kafka_df = df.selectExpr("CAST(value AS STRING)")

Step 4 - Extract "payload" only (I'm stuck here):

    import pyspark.sql.functions as psf

    emp_df = kafka_df\
    .select(psf.from_json(psf.col('value'), payload_schema).alias("DF"))\
    .select("DF.*")

I'm stuck in this part as I couldn't figure out how to extract payload from JSON string before passing it to from_json() function.

Note : I'm aware of that I need to define the full schema for the entire message before I can make use of it in from_json(), however; I'm trying to get only "payload" json string part.


Solution

  • You could make use of the SQL function get_json_object:

    import pyspark.sql.functions as psf
    
    kafka_df
      .select(psf.get_json_object(kafka_df['value'],"$.payload").alias('payload'))
      .select(psf.from_json(psf.col('payload'), payload_schema).alias("DF"))
      .select("DF.*")
    

    Or, you need to define the full schema for the entire message before you can make use of it in from_json.

    That means your schema should rather look like below:

    full_schema = StructType([
      StructField("schema", StructType([
        StructField("type", StringType(), False),
        StructField("name", StringType(), False),
        StructField("fields", StructType([
          StructField("field", StringType(), False),
          StructField("type", StringType(), False)
        ]),
      StructField("payload", StructType([
        StructField("emp_id", StringType(), False),
        StructField("emp_name", StringType(), True),
        StructField("city", StringType(), True),
        StructField("emp_sal", StringType(), True),
        StructField("manager_name", StringType(), True)
      ])
    ])
    

    Please double check this schema definition as I am not entirely sure how to define an Array within a schema in Python, but I hope the idea is clear.

    As soon as this is done, you can select the payload fields by

    import pyspark.sql.functions as psf
    
        emp_df = kafka_df\
        .select(psf.from_json(psf.col('value'), full_schema).alias("DF"))\
        .select("DF.payload.*")