Search code examples
jsonscalaapache-kafkadeserializationspark-structured-streaming

Why is this JSON giving null values when I deserialize and try to extract values


I'm using Spark Structured Streaming to consume messages sent from a few Kafka topics. The json string is structured like this and I want to extract 'created_at', 'text' and 'tag':

{"data":
  {"created_at":"***",
   "id":"***",
   "text":"***"},
 "matching_rules":
   [{"id":"***",
     "tag":"***"}]
}

I wrote the following schema:

val DFschema = StructType(Array(
      StructField("data", StructType(Array(
        StructField("created_at", TimestampType),
        StructField("text", StringType)))),
      StructField("matching_rules", StructType(Array(
        StructField("tag", StringType)
      )))
    ))

When I use the schema with from_json() I can successfully extract 'created_at' and 'text' as columns with non-null values using getField(), but when I try to do the same for 'tag' its column is populated with nulls:

val kafkaDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", servers)
      .option("failOnDataLoss", "false")
      .option("subscribe", topics)
      .option("startingOffsets", "earliest")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .select(col("key"), from_json($"value", DFschema).alias("structdata"))
      .select($"key",
        $"structdata.data".getField("created_at").alias("created_at"),
        $"structdata.data".getField("text").alias("text"),
        $"structdata.matching_rules".getField("tag").alias("topic")
      )
      .withColumn("hour", date_format(col("created_at"), "HH"))
      .withColumn("date", date_format(col("created_at"), "yyyy-MM-dd"))

Looking at the json I see that 'id' and 'tag' are wrapped in square brackets and this leads me to suspect I've left out a datatype in the schema, but I'm not experienced enough to know what. Appreciate the help.


Solution

  • For arrays, you have to wrap your StructType with ArrayType as below:

    val DFschema = StructType(Array(
      StructField("data", StructType(Array(
        StructField("created_at", TimestampType),
        StructField("id", StringType),
        StructField("text", StringType)))),
      StructField("matching_rules", ArrayType(StructType(Array(
        StructField("tag", StringType),
        StructField("id", StringType))
      ))))
    )
    

    Another alternative is to use (under the assumption that content is your column):

    ds = ds.withColumn("content", 
      expr("from_json(content, 'STRUCT<data:STRUCT<created_at:STRING,id:STRING,text:STRING>,matching_rules:ARRAY<STRUCT<id:STRING,tag:STRING>>>')")
    )
    

    You can ask Spark to generate the schema through schema_of_json.

    Good luck!