Search code examples
scalaapache-sparkspark-structured-streaming

In Spark, how objects and variables are kept in memory and across different executors?


In Spark, how objects and variables are kept in memory and across different executors?

I am using:

  • Spark 3.0.0
  • Scala 2.12

I am working on writing a Spark Structured Streaming job with a custom Stream Source. Before the execution of the spark query, I create a bunch of metadata which is used by my Spark Streaming Job

I am trying to understand how this metadata is kept in memory across different executors?

Example Code:

case class JobConfig(fieldName: String, displayName: String, castTo: String)


val jobConfigs:List[JobConfig] = build(); //build the job configs

  val query = spark
    .readStream
    .format("custom-streaming")
    .load

  query
    .writeStream
    .trigger(Trigger.ProcessingTime(2, TimeUnit.MINUTES))
    .foreachBatch { (batchDF: DataFrame, batchId: Long) => {
      CustomJobExecutor.start(jobConfigs) //CustomJobExecutor does data frame transformations and save the data in PostgreSQL.
    }
    }.outputMode(OutputMode.Append()).start().awaitTermination()

Need help in understanding following:

In the sample code, how Spark will keep "jobConfigs" in memory across different executors?

Is there any added advantage of broadcasting?

What is the efficient way of keeping the variables which can't be deserialized?


Solution

  • Local variables are copied for each task meanwhile broadcasted variables are copied only per executor. From docs

    Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

    It means that if your jobConfigs is large enough and the number of tasks and stages where the variable is used significantly larger than the number of executors, or deserialization is time-consuming, in that case, broadcast variables can make a difference. In other cases, they don't.