Search code examples
dataframeapache-sparkapache-spark-sqlcassandraspark-cassandra-connector

How to query JSON data column using Spark DataFrames?


I have a Cassandra table that for simplicity looks something like:

key: text
jsonData: text
blobData: blob

I can create a basic data frame for this using spark and the spark-cassandra-connector using:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

I'm struggling though to expand the JSON data into its underlying structure. I ultimately want to be able to filter based on the attributes within the json string and return the blob data. Something like jsonData.foo = "bar" and return blobData. Is this currently possible?


Solution

  • Spark >= 2.4

    If needed, schema can be determined using schema_of_json function (please note that this assumes that an arbitrary row is a valid representative of the schema).

    import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
    import collection.JavaConverters._
    
    val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
    df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))
    

    Spark >= 2.1

    You can use from_json function:

    import org.apache.spark.sql.functions.from_json
    import org.apache.spark.sql.types._
    
    val schema = StructType(Seq(
      StructField("k", StringType, true), StructField("v", DoubleType, true)
    ))
    
    df.withColumn("jsonData", from_json($"jsonData", schema))
    

    Spark >= 1.6

    You can use get_json_object which takes a column and a path:

    import org.apache.spark.sql.functions.get_json_object
    
    val exprs = Seq("k", "v").map(
      c => get_json_object($"jsonData", s"$$.$c").alias(c))
    
    df.select($"*" +: exprs: _*)
    

    and extracts fields to individual strings which can be further casted to expected types.

    The path argument is expressed using dot syntax, with leading $. denoting document root (since the code above uses string interpolation $ has to be escaped, hence $$.).

    Spark <= 1.5:

    Is this currently possible?

    As far as I know it is not directly possible. You can try something similar to this:

    val df = sc.parallelize(Seq(
      ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
      ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
    )).toDF("key", "jsonData", "blobData")
    

    I assume that blob field cannot be represented in JSON. Otherwise you cab omit splitting and joining:

    import org.apache.spark.sql.Row
    
    val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
    val jsons = sqlContext.read.json(df.drop("blobData").map{
      case Row(key: String, json: String) =>
        s"""{"key": "$key", "jsonData": $json}"""
    }) 
    
    val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
    parsed.printSchema
    
    // root
    //  |-- jsonData: struct (nullable = true)
    //  |    |-- k: string (nullable = true)
    //  |    |-- v: double (nullable = true)
    //  |-- key: long (nullable = true)
    //  |-- blobData: string (nullable = true)
    

    An alternative (cheaper, although more complex) approach is to use an UDF to parse JSON and output a struct or map column. For example something like this:

    import net.liftweb.json.parse
    
    case class KV(k: String, v: Int)
    
    val parseJson = udf((s: String) => {
      implicit val formats = net.liftweb.json.DefaultFormats
      parse(s).extract[KV]
    })
    
    val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
    parsed.show
    
    // +---+--------------------+------------------+----------+
    // |key|            jsonData|          blobData|parsedJSON|
    // +---+--------------------+------------------+----------+
    // |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]|
    // |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]|
    // +---+--------------------+------------------+----------+
    
    parsed.printSchema
    
    // root
    //  |-- key: string (nullable = true)
    //  |-- jsonData: string (nullable = true)
    //  |-- blobData: string (nullable = true)
    //  |-- parsedJSON: struct (nullable = true)
    //  |    |-- k: string (nullable = true)
    //  |    |-- v: integer (nullable = false)