Search code examples
scalaapache-sparkhadoopserialization

What is the best possible way to call Hadoop FileSystem operations from Serializable Scala object


/What I am trying/

I want to do some Spark UDF transformation on several HDFS buckets containing BZ2 files. I have defined a MyMain Scala object that extends Serializable since it involves calling UDF transformation on each of these HDFS buckets.

However, before doing the UDF transformations, I need to filter the HDFS buckets that are actually containing some BZ2 file(s). This needs Hadoop FileSystem operations that I have kept in MyMain.main method, so as to restrict those computations in driver memory and not to distribute to worker nodes, because as I understand FileSystem is not serializable.

However, even after I made a separate serializable HadoopUtils class and made a singleton companion object and called all FileSystem operations in MyMain.main, I am still getting "Task not serializable" Exception (below)

/Question(s)/

What is the way to call non-serializable FileSystem operations from a Serializable object, like MyMain? Also, class HadoopUtils extends Serializable does not seem to be serializable, though defined as so?

/My Code/

val prependtoList = (x1: String, x2: List[String]) => x2.map(x1+_)

class HadoopUtils extends Serializable {

  def existsDir(fs: FileSystem, path: String) : Boolean = {
    val p = new Path(path)
    fs.exists(p) && fs.getFileStatus(p).isDirectory
  }
  def ifBZFileExists(fs: FileSystem, bucketBZDir: String) : Boolean = {
    val path = new Path(bucketBZDir)
    val fileStatus = fs.listStatus(path).filter(
      p => { p.isFile && p.getPath.getName.endsWith(".bz2")}
    )
    !fileStatus.isEmpty
  }

  def getBZ2Buckets(fs: FileSystem, lookupPath: String) : List[String] = {
    //Filter the list of buckets having at least one BZ2 file in it
    val range = (1 to 16).toList.map(x => x.toString)
    val buckets = prependtoList("Bucket",range)
    val allBuckets = prependtoList(lookupPath + "/", buckets)
    //From Bucket1 to Bucket16, filter the buckets that are existing e.g. Bucket5 may not exist
    val existingBuckets = allBuckets.filter(p => { existsDir(fs,p) })
    val BZ2BucketPaths = existingBuckets.filter(path => { ifBZFileExists(fs,path) }).map(
        path => { path + "/*.bz2" })
    BZ2BucketPaths
  }
}

object HadoopUtils {
  val getHadoopUtils = new HadoopUtils
}

object MyMain extends Serializable {
  val clusterNameNodeURL = "hdfs://mycluster.domain.com:8020"
  val basePath = "/path/to/buckets"
  def main(args: Array[String]): Unit = {
    //NOTE: spark, hadoopfs defined in main so as to be processed in Driver
    val spark = SparkSession
      .builder()
      .appName("My_App")
      .enableHiveSupport()
      .getOrCreate()

    val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

    val BZ2Buckets = 
      HadoopUtils.getHadoopUtils.getBZ2BucketPaths(hadoopfs,clusterNameNodeURL + basePath)

    BZ2Buckets.foreach(path => {
      //Doing Spark UDF transformations on each bucket, which needs to be serialized
    })


  }
}

/Stack Trace of the Exception/

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
  at MyMain$.main(<pastie>:197)
  ... 51 elided
Caused by: java.io.NotSerializableException: HadoopUtils$
Serialization stack:
    - object not serializable (class: HadoopUtils$, value: HadoopUtils$@7f5bab61)
    - field (class: $iw, name: HadoopUtils$module, type: class HadoopUtils$)
    - object (class $iw, $iw@3f4a0d43)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@74d06d1e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@f9764ea)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6821099e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@4f509444)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@11462802)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@11d2d501)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@284fd700)
    - field (class: $line14.$read, name: $iw, type: class $iw)
    - object (class $line14.$read, $line14.$read@46b4206a)
    - field (class: $iw, name: $line14$read, type: class $line14.$read)
    - object (class $iw, $iw@33486894)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@25980fc9)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@1fb0d28d)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@42ea11d5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@42d28cc1)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@22131a73)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@631878e1)
    - field (class: $line18.$read, name: $iw, type: class $iw)
    - object (class $line18.$read, $line18.$read@561c52c0)
    - field (class: $iw, name: $line18$read, type: class $line18.$read)
    - object (class $iw, $iw@1d5b8be2)
    - field (class: $iw, name: $outer, type: class $iw)
    - object (class $iw, $iw@4de4c672)
    - field (class: $anonfun$1, name: $outer, type: class $iw)
    - object (class $anonfun$1, <function2>)
    - element of array (index: 9)
    - array (class [Ljava.lang.Object;, size 15)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 85 more

Solution

  • It seems the Task not serializable issue was not related to either the HadoopUtils class or object. Given that, in the driver program, instance of HadoopUtils class is accessed through singleton HadoopUtils object i.e. HadoopUtils.getHadoopUtil, the HadoopUtils class needs to be serialized along with MyMain object.

    The solution to the issue can be referenced here