Search code examples
apache-sparkapache-spark-sqlspark-structured-streaming

How to use from_json with schema as string (i.e. a JSON-encoded schema)?


I'm reading a stream from Kafka, and I convert the value from Kafka ( which is JSON ) in to Structure.

from_json has a variant that takes a schema of type String, but I could not find a sample. Please advise what is wrong in the below code.

Error

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',

== SQL ==
STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING )  )  ) 
-------^^^

at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)

Program

public static void main(String[] args) throws AnalysisException {
    String master = "local[*]";
    String brokers = "quickstart:9092";
    String topics = "simple_topic_6";

    SparkSession sparkSession = SparkSession
            .builder().appName(EmployeeSchemaLoader.class.getName())
            .master(master).getOrCreate();

   String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +
            "addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING )  )  ) ";

    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();

    Dataset<Row> employeeDataset = sparkSession.readStream().
            format("kafka").
            option("kafka.bootstrap.servers", brokers)
            .option("subscribe", topics).load();
    employeeDataset.printSchema();
    employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));
    employeeDataset = employeeDataset.withColumn("employeeRecord",
            functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));

    employeeDataset.printSchema();
    employeeDataset.createOrReplaceTempView("employeeView");

    sparkSession.catalog().listTables().show();

    sqlCtx.sql("select * from employeeView").show();
}

Solution

  • Your question helped me to find that the variant of from_json with String-based schema was only available in Java and has recently been added to Spark API for Scala in the upcoming 2.3.0. I've so long lived with the strong belief that Spark API for Scala was always the most feature-rich and your question helped me to learn it should not have been so before the change in 2.3.0 (!)

    Back to your question, you can define the string-based schema in JSON or DDL format actually.

    Writing JSON by hand may be a bit cumbersome and so I'd take a different approach (that given I'm a Scala developer is fairly easy).

    Let's first define the schema using Spark API for Scala.

    import org.apache.spark.sql.types._
    val addressesSchema = new StructType()
      .add($"city".string)
      .add($"state".string)
      .add($"zip".string)
    val schema = new StructType()
      .add($"firstName".string)
      .add($"lastName".string)
      .add($"email".string)
      .add($"addresses".array(addressesSchema))
    scala> schema.printTreeString
    root
     |-- firstName: string (nullable = true)
     |-- lastName: string (nullable = true)
     |-- email: string (nullable = true)
     |-- addresses: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- city: string (nullable = true)
     |    |    |-- state: string (nullable = true)
     |    |    |-- zip: string (nullable = true)
    

    That seems to match your schema, doesn't it?

    With that convert the schema to a JSON-encoded string was a breeze with json method.

    val schemaAsJson = schema.json
    

    schemaAsJson is exactly your JSON string which looks pretty...hmmm...complex. For the display purposes I'd rather use prettyJson method.

    scala> println(schema.prettyJson)
    {
      "type" : "struct",
      "fields" : [ {
        "name" : "firstName",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "lastName",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "email",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "addresses",
        "type" : {
          "type" : "array",
          "elementType" : {
            "type" : "struct",
            "fields" : [ {
              "name" : "city",
              "type" : "string",
              "nullable" : true,
              "metadata" : { }
            }, {
              "name" : "state",
              "type" : "string",
              "nullable" : true,
              "metadata" : { }
            }, {
              "name" : "zip",
              "type" : "string",
              "nullable" : true,
              "metadata" : { }
            } ]
          },
          "containsNull" : true
        },
        "nullable" : true,
        "metadata" : { }
      } ]
    }
    

    That's your schema in JSON.

    You can use DataType and "validate" the JSON string (using DataType.fromJson that Spark uses under the covers for from_json).

    import org.apache.spark.sql.types.DataType
    val dt = DataType.fromJson(schemaAsJson)
    scala> println(dt.sql)
    STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>
    

    All seems fine. Mind if I'm checking this out with a sample dataset?

    val rawJsons = Seq("""
      {
        "firstName" : "Jacek",
        "lastName" : "Laskowski",
        "email" : "[email protected]",
        "addresses" : [
          {
            "city" : "Warsaw",
            "state" : "N/A",
            "zip" : "02-791"
          }
        ]
      }
    """).toDF("rawjson")
    val people = rawJsons
      .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
      .select("json.*") // <-- flatten the struct field
      .withColumn("address", explode($"addresses")) // <-- explode the array field
      .drop("addresses")  // <-- no longer needed
      .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field
    scala> people.show
    +---------+---------+---------------+------+-----+------+
    |firstName| lastName|          email|  city|state|   zip|
    +---------+---------+---------------+------+-----+------+
    |    Jacek|Laskowski|[email protected]|Warsaw|  N/A|02-791|
    +---------+---------+---------------+------+-----+------+