Search code examples
apache-sparkspark-structured-streaming

Continuous trigger not found in Structured Streaming


Runtime: Spark 2.3.0, Scala 2.11 (Databricks 4.1 ML beta)

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._

//kafka settings and df definition goes here

val query = df.writeStream.format("parquet")
.option("path", ...)
.option("checkpointLocation",...)
.trigger(continuous(30000))
.outputMode(OutputMode.Append)
.start

Throws error not found: value continuous

Other attempts that did not work:

.trigger(continuous = "30 seconds") //as per Databricks blog
// throws same error as above

.trigger(Trigger.Continuous("1 second")) //as per Spark docs
// throws java.lang.IllegalStateException: Unknown type of trigger: ContinuousTrigger(1000)

References:

(Databricks Blog) https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html

(Spark guide) http://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html#continuous-processing

(Scaladoc) https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.streaming.package


Solution

  • Spark 2.3.0 does not support parquet under continuous streams, you would have to use streams based on Kafka, console or memory.

    To quote the continuous processing mode in structured streaming blog post:

    You can set the optional Continuous Trigger in queries that satisfy the following conditions: Read from supported sources like Kafka and write to supported sinks like Kafka, memory, console.