Search code examples
apache-sparkxml-parsingapache-spark-sqlspark-structured-streaming

How to read streaming data in XML format from Kafka?


I am trying to read XML data from Kafka topic using Spark Structured streaming.

I tried using the Databricks spark-xml package, but I got an error saying that this package does not support streamed reading. Is there any way I can extract XML data from Kafka topic using structured streaming?

My current code:

df = spark \
      .readStream \
      .format("kafka") \
      .format('com.databricks.spark.xml') \
      .options(rowTag="MainElement")\
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option(subscribeType, "test") \
      .load()

The error:

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)

Solution

  • .format("kafka") \
    .format('com.databricks.spark.xml') \
    

    The last one with com.databricks.spark.xml wins and becomes the streaming source (hiding Kafka as the source).

    In order words, the above is equivalent to .format('com.databricks.spark.xml') alone.

    As you may have experienced, the Databricks spark-xml package does not support streaming reading (i.e. cannot act as a streaming source). The package is not for streaming.

    Is there any way I can extract XML data from Kafka topic using structured streaming?

    You are left with accessing and processing the XML yourself with a standard function or a UDF. There's no built-in support for streaming XML processing in Structured Streaming up to Spark 2.2.0.

    That should not be a big deal anyway. A Scala code could look as follows.

    val input = spark.
      readStream.
      format("kafka").
      ...
      load
    
    val values = input.select('value cast "string")
    
    val extractValuesFromXML = udf { (xml: String) => ??? }
    val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))
    
    // print XMLs and numbers to the stdout
    val q = numbersFromXML.
      writeStream.
      format("console").
      start
    

    Another possible solution could be to write your own custom streaming Source that would deal with the XML format in def getBatch(start: Option[Offset], end: Offset): DataFrame. That is supposed to work.