Search code examples
apache-sparkpysparkapache-kafkaapache-spark-sqlspark-structured-streaming

Can't Tranform Kafka Json Data in Spark Structured Streaming


I am trying to get Kafka messages and processing it with Spark in standalone. Kafka stores data as json format. I can get Kafka messages but can not parse json data with defining schema.

When I run the bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_kafka_topic --from-beginning command to see the kafka messages in kafka topic, it outputs as follows:

"{\"timestamp\":1553792312117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":21,\"q\":true,\"t\":1553792311686}]}"
"{\"timestamp\":1553792317117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":22,\"q\":true,\"t\":1553792316688}]}"

And I can get this data succesfully with this code block in Spark:

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.select(col("value").cast("string"))

The schema is like this:

df.printSchema()

root
 |-- value: string (nullable = true)

And then writing this dataframe to console and it prints the kafka messages:

Batch: 9
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|"{\"timestamp\":1...|
+--------------------+

But I want to parse json data to define schema and the code block that I've tried to do it:

schema = StructType([ StructField("timestamp", LongType(), False), StructField("values", ArrayType( StructType([ StructField("id", StringType(), True), StructField("v", IntegerType(), False), StructField("q", BooleanType(), False), StructField("t", LongType(), False) ]), True ), True) ])

parsed = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "my_kafka_topic") \
  .load() \
  .select(from_json(col("value").cast("string"), schema).alias("opc"))

And the schema of parsed dataframe:

parsed.printSchema()
root
  |-- opc: struct (nullable = true)
  |    |-- timestamp: string (nullable = true)
  |    |-- values: struct (nullable = true)
  |    |    |-- id: string (nullable = true)
  |    |    |-- v: integer (nullable = true)
  |    |    |-- q: boolean (nullable = true)
  |    |    |-- t: string (nullable = true)

These code blocks run without error. But when I want to write parsed dataframe to the console:

query = parsed\
   .writeStream\
   .format("console")\
   .start()

query.awaitTermination()

it is writing null like this in console:

+----+
| opc|
+----+
|null|
+----+

So, it seems there is problem with parsing json data but can't figure out it.

Can you tell me what is wrong?


Solution

  • It seems that the schema was not correct for your case please try to apply the next one:

    schema = StructType([ 
    StructField("timestamp", LongType(), False), 
    StructField("values", ArrayType(
        StructType([StructField("id", StringType(), True), 
        StructField("v", IntegerType(), False), 
        StructField("q", BooleanType(), False), 
        StructField("t", LongType(), False)]), True), True)])
    

    Also remember that the inferSchema option works pretty well so you could let Spark discover the schema and save it.

    Another issue is that your json data has leading and trailing double quotes " also it contains \ those make an invalid JSON which was preventing Spark from parsing the message.

    In order to remove the invalid characters your code should modified as next:

    parsed = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", "my_kafka_topic") \
      .load() \
      .withColumn("value", regexp_replace(col("value").cast("string"), "\\\\", "")) \
      .withColumn("value", regexp_replace(col("value"), "^\"|\"$", "")) \
      .select(from_json(col("value"), schema).alias("opc"))
    

    Now your output should be:

    +------------------------------------------------------------------------------------------------------------------+
    |value                                                                                                             |
    +------------------------------------------------------------------------------------------------------------------+
    |{"timestamp":1553588718638,"values":[{"id":"Simulation.Simulator.Temperature","v":26,"q":true,"t":1553588717036}]}|
    +------------------------------------------------------------------------------------------------------------------+
    

    Good luck!