Search code examples
scalaapache-sparkapache-spark-sqlspark-shell

Is the SparkSession variable staged by spark-shell (scala) a val or a var?


I am trying to convert my Spark Scala scripts (written in spark-shell) as Scala Class, Object, methods (def), etc. so I create JARs for spark-submit. I make a lot of calls using Spark SQL that performs a lot of timestamp computations with respect to timezone. I have to set the following configuration explicitly (because every distributed node may have different default timezone configured) to make sure my timezone will always be UTC for any subsequent Spark SQL timestamp manipulations by any Spark SQL function calls (code block) within that method.

spark.conf.set("spark.sql.session.timeZone", "UTC")

Should that method signature includes (spark: org.apache.spark.sql.SparkSession) as a parameter, so I can always start with the explicit code statement for setting the timezone to UTC to SparkSession without taking any chances (that all the distributed Spark nodes may or may not have the exact same timezone configurations)?

The next concerning question I have is, how do I find out if the "spark" variable setup by the spark-shell is a val or var? In search of the answer to this question, I found this code snippet in the hope to find out if this Scala variable is immutable or mutable. But it did not tell me if Scala variable spark is a var or a val. Do I need to return spark back to the method caller after I set the spark.sql.session.timeZone to UTC because I modified it in my method? Currently my method signature expects two input parameters (org.apache.spark.sql.SparkSession, org.apache.spark.sql.DataFrame) and the output is a tuple (org.apache.spark.sql.SparkSession, org.apache.spark.sql.DataFrame).

scala> def manOf[T: Manifest](t: T): Manifest[T] = manifest[T]
manOf: [T](t: T)(implicit evidence$1: Manifest[T])Manifest[T]

scala> manOf(List(1))
res3: Manifest[List[Int]] = scala.collection.immutable.List[Int]

scala> manOf(spark)
res2: Manifest[org.apache.spark.sql.SparkSession] = org.apache.spark.sql.SparkSession

Extra context: As part of launching spark-shell, the variable spark is initialized as follows:

Spark context available as 'sc' (master = yarn, app id = application_1234567890_111111).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_REDACTED)
Type in expressions to have them evaluated.
Type :help for more information.

Solution

  • Thanks to @Luis Miguel Mejía Suárez for providing me the answers & hints as comments to my question. I implemented the following experiment that spark is a mutable object, where I was just using spark as an identical reference to the same object outside the method and inside the method. Although this undesirable side-effect is not a pure functional implementation, but it does save me the trouble to return the spark object back to the caller for other subsequent processing. If someone else has a better solution, please do share.

    def x(spark: SparkSession, inputDF: DataFrame) = {
      import spark.implicits._
      spark.conf.set("spark.sql.session.timeZone", "UTC") // mutation of the object inside method
    
      //...spark.sql.functions...
      finalDF
    }
    

    Launched spark-shell and executed the following:

    Spark context available as 'sc' (master = yarn, app id = application_1234567890_222222).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
          /_/
    
    Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_REDACTED)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.conf.get("spark.sql.session.timeZone")
    res1: String = America/New_York
    
    scala> :load x.scala
    x: (spark: org.apache.spark.sql.SparkSession, inputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
    
    scala> val timeConvertedDF = x(spark, inputDF)
    timeConvertedDF: org.apache.spark.sql.DataFrame = [att1: timestamp, att2: string ... 25 more fields]
    
    scala> spark.conf.get("spark.sql.session.timeZone")
    res4: String = UTC