Search code examples
scalaapache-sparkmap-function

"NotSerializableException " in scala map function


I am reading a file and try to map the values using a function. But it is giving a error NotSerializableException Below is the code I am running:

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min

/** Find the minimum temperature by weather station */
object MinTemperatures {

  def parseLine(line: String) = {
    val fields = line.split(",")
    val stationID = fields(0)
    val entryType = fields(2)
    val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f
    (stationID, entryType, temperature)
  }

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "MinTemperatures")

    // Read each line of input data
    val lines = sc.textFile("../DataSet/1800.csv")

    // Convert to (stationID, entryType, temperature) tuples
    val parsedLines = lines.map(parseLine)
}
}

When I run the above code it is giving me below error:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:371) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.map(RDD.scala:370) at com.sundogsoftware.spark.MinTemperatures$.main(MinTemperatures.scala:32) at com.sundogsoftware.spark.MinTemperatures.main(MinTemperatures.scala)

Caused by: java.io.NotSerializableException:

com.sundogsoftware.spark.MinTemperatures$ Serialization stack: - object not serializable (class: com.sundogsoftware.spark.MinTemperatures$, value: com.sundogsoftware.spark.MinTemperatures$@41fed14f) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.sundogsoftware.spark.MinTemperatures$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/sundogsoftware/spark/MinTemperatures$.$anonfun$main$1:(Lcom/sundogsoftware/spark/MinTemperatures$;Ljava/lang/String;)Lscala/Tuple3;, instantiatedMethodType=(Ljava/lang/String;)Lscala/Tuple3;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda)

But when I run the same code with anonymous function it is running successfully:

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min

/** Find the minimum temperature by weather station */
object MinTemperatures {

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "MinTemperatures")

    // Read each line of input data
    val lines = sc.textFile("../DataSet/1800.csv")

    // Convert to (stationID, entryType, temperature) tuples
    val parsedLines = lines.map(x => {
      val fields = x.split(",");
      val stationID = fields(0);
      val entryType = fields(2);
      val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f;
      (stationID, entryType, temperature)
    })

    // Filter out all but TMIN entries
    val minTemps = parsedLines.filter(x => x._2 == "TMIN")

    // Convert to (stationID, temperature)
    val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))

    // Reduce by stationID retaining the minimum temperature found
    val minTempsByStation = stationTemps.reduceByKey((x, y) => min(x, y))

    // Collect, format, and print the results
    val results = minTempsByStation.collect()

    for (result <- results.sorted) {
      val station = result._1
      val temp = result._2
      val formattedTemp = f"$temp%.2f F"
      println(s"$station minimum temperature: $formattedTemp")
    }

  }
}

Output:

EZE00100082 minimum temperature: 7.70 F
ITE00100554 minimum temperature: 5.36 F

As you seen above, when I am using named function (parseLine) inside map it is giving error, but the same program instead of named function when I used anonymous function in map it is running successfully.

I looked into few blogs but didn't get the reason for the error. Could anyone help me to understand this?


Solution

  • This issue doesn't seem to be sbt or dependency related, as I checked, this happens when the script is not defined as object (Scala objects are serialize-able by default) so this error means the script is not serializable. I created a new object and pasted the same code. It worked.