Search code examples
apache-sparkspark-structured-streaming

Sending json events to kafka in non-stringified format


I have created a dataframe like below, where I have used to_json() method to create JSON array value.

+---------------------------------------------------------------------------------------------------- 

|json_data                                                                                                  |
+-----------------------------------------------------------------------------------------------------------+
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|
+-----------------------------------------------------------------------------------------------------------+

I am using the below method to send the dataframe to a kafka topic. But when I consume the data which has been sent to the kafka topic, I could see the json data got stringified.

Code to push the data to kafka:

outgoingDF.selectExpr("CAST(Key as STRING) as key", "to_json(struct(*)) AS value")
        .write
        .format("kafka")
        .option("topic", "topic_test")
        .option("kafka.bootstrap.servers", "localhost:9093")
        .option("checkpointLocation", checkpointPath)
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("truncate", false)
        .save()

Stringified data being received in kafka:

{
    "name": "sensor1",
    "value-array": "[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]"
}

How can we send the data to kafka topic, so that we dont see stringified jsons as output ?


Solution

  • json_data is of type string & again you are passing json_data to to_json(struct("*")) function.

    Check value column which is going to kafka.

    df.withColumn("value",to_json(struct($"*"))).show(false)
    +-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
    |json_data                                                                                                  |value                                                                                                                                      |
    +-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
    |{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|{"json_data":"{\"name\":\"sensor1\",\"value-array\":[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]}"}|
    +-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
    

    Try below code.

     df
     .withColumn("value-array",array(struct($"time",$"sensorvalue",$"tag1")))
     .selectExpr("CAST(Key as STRING) as key",to_json(struct($"name",$"value-array")).as("value"))
     .write
     .format("kafka")
     .option("topic", "topic_test")
     .option("kafka.bootstrap.servers", "localhost:9093")
     .option("checkpointLocation", checkpointPath)
     .option("kafka.sasl.mechanism", "PLAIN")
     .option("kafka.security.protocol", "SASL_SSL")
     .option("truncate", false)
     .save()