Search code examples
scalaapache-spark

Spark - How to make a map serializable


I need to extract and transform from a big dataset some information which will be later consumed by other dataset.

Since the information to be consumed is always the same, and since it can be stored in a pair-value fashion, I was considering to just save this information in a look-at map which will be consumed by a udf, so I avoid several calls to the big dataset.

The problem is I am getting the following error:

org.apache.spark.SparkException: Task not serializable

Is there any way to make the map serializable?

In case it is not possible, is there another way to store information in a look-at object in Spark?

Here is my code:

val cityTimeZone: scala.collection.immutable.Map[String,Double]  = Map("CEB" -> 8.0, "LGW" -> 0.0, "CPT" -> 2.0
, "MUC" -> 1.0, "SGN" -> 7.0, "BNE" -> 10.0, "DME" -> 3.0, "FJR" -> 4.0, "BAH" -> 3.0, "ARN" -> 1.0, "FCO" -> 1.0, "DUS" -> 1.0, "MRU" -> 4.0, "JFK" -> -5.0, "GLA" -> 0.0)

def getLocalHour = udf ((city:String, timeutc:Int) => {
    val timeOffset = cityTimeZone(city)
    val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
    localtime
})

//$"dateutc" is a timestamp column like this: 2017-03-01 03:45:00 and $"city" a 3 letters code in capitals, like those in the map above

val newDF = DF
.select("dateutc","city")
.withColumn("utchour", hour($"dateutc"))
.withColumn("localhour", getLocalHour($"city", $"utchour"))

display(newDF)

Solution

  • The member variable declaration

    val cityTimeZone  
    

    in combination with

    cityTimeZone(city)
    

    inside the udf is problematic, because the latter is just a shortcut for

    this.cityTimeZone(city)
    

    where this is (presumably) some huge non-serializable object (probably because it contains a reference to a non-serializable spark context).

    Make getLocalHour a lazy val, and move the map that is needed by the udf inside the definition of getLocalHour as a local variable, something along these lines:

    lazy val getLocalHour = {
      val cityTimeZone: Map[String, Double] = Map("CEB" -> 8.0, "LGW" -> 0.0)
      udf ((city:String, timeutc:Int) => {
        val timeOffset = cityTimeZone(city)
        val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
        localtime
      })
    }
    

    Alternatively, attach cityTimeZone to some serializable object (i.e. some object that does not contain references to any threads, sockets, spark contexts and all the other non-serializable stuff; e.g. package objects with utility methods and constants would be fine).

    If the udf definition contains references to any other member variables, treat those accordingly.