Search code examples
apache-sparkspark-structured-streaming

How to start multiple streaming queries in a single Spark application?


I have built few Spark Structured Streaming queries to run on EMR, they are long running queries, and need to run at all times, since they are all ETL type queries, when I submit a job to YARN cluster on EMR, I can submit a single spark application. So that spark application should have multiple streaming queries.

I am confused on how to build/start multiple streaming queries within same submit programmatically.

For ex: I have this code:

case class SparkJobs(prop: Properties) extends Serializable {
  def run() = {
      Type1SparkJobBuilder(prop).build().awaitTermination()
      Type1SparkJobBuilder(prop).build().awaitTermination()
  }
}

I fire this in my main class with SparkJobs(new Properties()).run()

When I see in the spark history server, only the first spark streaming job (Type1SparkJob) is running.

What is the recommended way to fire multiple streaming queries within same spark submit programatically, I could not find proper documentation either.


Solution

  • Since you're calling awaitTermination on the first query it's going to block until it completes before starting the second query. So you want to kick off both queries, but then use StreamingQueryManager.awaitAnyTermination.

    val query1 = df.writeStream.start()
    val query2 = df.writeStream.start()
    
    spark.streams.awaitAnyTermination()
    

    In addition to the above, by default Spark uses the FIFO scheduler. Which means the first query gets all resources in the cluster while it's executing. Since you're trying to run multiple queries concurrently you should switch to the FAIR scheduler

    If you have some queries that should have more resources than the others then you can also tune the individual scheduler pools.