Search code examples
jsonscaladatabricksapache-kafka-streamsazure-databricks

Scala schema_of_json function fails in spark structured streaming


I have created a function to read JSON as a string with its schema. Then using that function in spark streaming. I am getting error while doing so. The same piece works when I create schema first, then use that schema to read, but doesn't work in single line. How can I fix it?

def processBatch(microBatchOutputDF: DataFrame, batchId: Long) {
  
  TOPICS.split(',').foreach(topic =>{
    var TableName = topic.split('.').last.toUpperCase
    var df = microBatchOutputDF
    
    /*var schema = schema_of_json(df
                                .select($"value")
                                .filter($"topic".contains(topic))
                                .as[String]
                               )*/
    
    var jsonDataDf = df.filter($"topic".contains(topic))
                      .withColumn("jsonData", from_json($"value", schema_of_json(lit($"value".as[String])), scala.collection.immutable.Map[String, String]().asJava))

    var srcTable = jsonDataDf
                    .select(col(s"jsonData.payload.after.*"), $"offset", $"timestamp")

    srcTable
      .select(srcTable.columns.map(c => col(c).cast(StringType)) : _*)
      .write
      .mode("append").format("delta").save("/mnt/datalake/raw/kafka/" + TableName)
    
    spark.sql(s"""CREATE TABLE IF NOT EXISTS kafka_raw.$TableName USING delta LOCATION '/mnt/datalake/raw/kafka/$TableName'""")
  } )
}

Spark streaming code

import org.apache.spark.sql.streaming.Trigger

val StreamingQuery = InputDf
                        .select("*")
                        .writeStream.outputMode("update")
                        .option("queryName", "StreamingQuery")
                        .foreachBatch(processBatch _)
                        .start()

Error: org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json/schema_of_csv functions instead of schema_of_json(value)


Solution

  • This is how I solved this. I created a filtered dataframe from the kafka output dataframe, and applied all the logics in it, as it was before. The problem with generating schema while reading is, from_json doesn't know which exact row to use from all the rows of the dataframe.

    def processBatch(microBatchOutputDF: DataFrame, batchId: Long) {
      
      TOPICS.split(',').foreach(topic =>{
        var TableName = topic.split('.').last.toUpperCase
        var df = microBatchOutputDF.where(col("topic") === topic)
        
        var schema = schema_of_json(df
                                    .select($"value")
                                    .filter($"topic".contains(topic))
                                    .as[String]
                                   )
        
        var jsonDataDf = df.withColumn("jsonData", from_json($"value", schema, scala.collection.immutable.Map[String, String]().asJava))
    
        var srcTable = jsonDataDf
                        .select(col(s"jsonData.payload.after.*"), $"offset", $"timestamp")
    
        srcTable
          .select(srcTable.columns.map(c => col(c).cast(StringType)) : _*)
          .write
          .mode("append").format("delta").save("/mnt/datalake/raw/kafka/" + TableName)
        
        spark.sql(s"""CREATE TABLE IF NOT EXISTS kafka_raw.$TableName USING delta LOCATION '/mnt/datalake/raw/kafka/$TableName'""")
      } )
    }