Search code examples
scalaapache-sparkapache-spark-sqlspark-structured-streaming

How to parse JSON records in Structured Streaming?


I'm working on a spark structured streaming app and I'm trying to parse JSON given in below format.

{"name":"xyz","age":29,"details":["city":"mumbai","country":"India"]}
{"name":"abc","age":25,"details":["city":"mumbai","country":"India"]}

Below is my Spark code to parse the JSON:

import org.apache.spark.sql.types._
import spark.implicits._
 val schema= new StructType()
    .add("name",DataTypes.StringType )
    .add("age", DataTypes.IntegerType)
    .add("details",
      new StructType()
        .add("city", DataTypes.StringType)
        .add("country", DataTypes.StringType)
    )

  val dfLogLines = dfRawData.selectExpr("CAST(value AS STRING)") //Converting binary to text

  val personNestedDf = dfLogLines.select(from_json($"value", schema).as("person"))
  val personFlattenedDf = personNestedDf.selectExpr("person.name", "person.age")

  personFlattenedDf.printSchema()
  personFlattenedDf.writeStream.format("console").option("checkpointLocation",checkpoint_loc3).start().awaitTermination()

Output:

root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+----+----+
|name| age|
+----+----+
|null|null|
|null|null|
+----+----+

The code does not throw any error but it returns null values in output. What am I doing wrong here? Thanks in advance.


Solution

  • tl;dr The JSON looks not well-formed in the details field.


    From the documentation of from_json standard function:

    Returns null, in the case of an unparseable string.

    The issue is with the details field.

    {"details":["city":"mumbai","country":"India"]}

    It looks like an array or a map, but none matches.

    scala> Seq(Array("one", "two")).toDF("value").toJSON.show(truncate = false)
    +-----------------------+
    |value                  |
    +-----------------------+
    |{"value":["one","two"]}|
    +-----------------------+
    
    scala> Seq(Map("one" -> "two")).toDF("value").toJSON.show(truncate = false)
    +-----------------------+
    |value                  |
    +-----------------------+
    |{"value":{"one":"two"}}|
    +-----------------------+
    
    scala> Seq(("mumbai", "India")).toDF("city", "country").select(struct("city", "country") as "details").toJSON.show(truncate = false)
    +-----------------------------------------------+
    |value                                          |
    +-----------------------------------------------+
    |{"details":{"city":"mumbai","country":"India"}}|
    +-----------------------------------------------+
    

    My recommendation would be to do the JSON parsing yourself using a user-defined function (UDF).