Search code examples

Why is this JSON giving null values when I deserialize and try to extract values

I'm using Spark Structured Streaming to consume messages sent from a few Kafka topics. The json string is structured like this and I want to extract 'created_at', 'text' and 'tag':


I wrote the following schema:

val DFschema = StructType(Array(
      StructField("data", StructType(Array(
        StructField("created_at", TimestampType),
        StructField("text", StringType)))),
      StructField("matching_rules", StructType(Array(
        StructField("tag", StringType)

When I use the schema with from_json() I can successfully extract 'created_at' and 'text' as columns with non-null values using getField(), but when I try to do the same for 'tag' its column is populated with nulls:

val kafkaDF: DataFrame = spark.readStream
      .option("kafka.bootstrap.servers", servers)
      .option("failOnDataLoss", "false")
      .option("subscribe", topics)
      .option("startingOffsets", "earliest")
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .select(col("key"), from_json($"value", DFschema).alias("structdata"))
      .withColumn("hour", date_format(col("created_at"), "HH"))
      .withColumn("date", date_format(col("created_at"), "yyyy-MM-dd"))

Looking at the json I see that 'id' and 'tag' are wrapped in square brackets and this leads me to suspect I've left out a datatype in the schema, but I'm not experienced enough to know what. Appreciate the help.


  • For arrays, you have to wrap your StructType with ArrayType as below:

    val DFschema = StructType(Array(
      StructField("data", StructType(Array(
        StructField("created_at", TimestampType),
        StructField("id", StringType),
        StructField("text", StringType)))),
      StructField("matching_rules", ArrayType(StructType(Array(
        StructField("tag", StringType),
        StructField("id", StringType))

    Another alternative is to use (under the assumption that content is your column):

    ds = ds.withColumn("content", 
      expr("from_json(content, 'STRUCT<data:STRUCT<created_at:STRING,id:STRING,text:STRING>,matching_rules:ARRAY<STRUCT<id:STRING,tag:STRING>>>')")

    You can ask Spark to generate the schema through schema_of_json.

    Good luck!