/What I am trying/
I want to do some Spark UDF transformation on several HDFS buckets containing BZ2 files. I have defined a MyMain Scala object that extends Serializable
since it involves calling UDF transformation on each of these HDFS buckets.
However, before doing the UDF transformations, I need to filter the HDFS buckets that are actually containing some BZ2 file(s). This needs Hadoop FileSystem operations that I have kept in MyMain.main method, so as to restrict those computations in driver memory and not to distribute to worker nodes, because as I understand FileSystem is not serializable.
However, even after I made a separate serializable HadoopUtils class and made a singleton companion object and called all FileSystem operations in MyMain.main, I am still getting "Task not serializable" Exception (below)
/Question(s)/
What is the way to call non-serializable FileSystem operations from a Serializable object, like MyMain? Also, class HadoopUtils extends Serializable
does not seem to be serializable, though defined as so?
/My Code/
val prependtoList = (x1: String, x2: List[String]) => x2.map(x1+_)
class HadoopUtils extends Serializable {
def existsDir(fs: FileSystem, path: String) : Boolean = {
val p = new Path(path)
fs.exists(p) && fs.getFileStatus(p).isDirectory
}
def ifBZFileExists(fs: FileSystem, bucketBZDir: String) : Boolean = {
val path = new Path(bucketBZDir)
val fileStatus = fs.listStatus(path).filter(
p => { p.isFile && p.getPath.getName.endsWith(".bz2")}
)
!fileStatus.isEmpty
}
def getBZ2Buckets(fs: FileSystem, lookupPath: String) : List[String] = {
//Filter the list of buckets having at least one BZ2 file in it
val range = (1 to 16).toList.map(x => x.toString)
val buckets = prependtoList("Bucket",range)
val allBuckets = prependtoList(lookupPath + "/", buckets)
//From Bucket1 to Bucket16, filter the buckets that are existing e.g. Bucket5 may not exist
val existingBuckets = allBuckets.filter(p => { existsDir(fs,p) })
val BZ2BucketPaths = existingBuckets.filter(path => { ifBZFileExists(fs,path) }).map(
path => { path + "/*.bz2" })
BZ2BucketPaths
}
}
object HadoopUtils {
val getHadoopUtils = new HadoopUtils
}
object MyMain extends Serializable {
val clusterNameNodeURL = "hdfs://mycluster.domain.com:8020"
val basePath = "/path/to/buckets"
def main(args: Array[String]): Unit = {
//NOTE: spark, hadoopfs defined in main so as to be processed in Driver
val spark = SparkSession
.builder()
.appName("My_App")
.enableHiveSupport()
.getOrCreate()
val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val BZ2Buckets =
HadoopUtils.getHadoopUtils.getBZ2BucketPaths(hadoopfs,clusterNameNodeURL + basePath)
BZ2Buckets.foreach(path => {
//Doing Spark UDF transformations on each bucket, which needs to be serialized
})
}
}
/Stack Trace of the Exception/
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
at MyMain$.main(<pastie>:197)
... 51 elided
Caused by: java.io.NotSerializableException: HadoopUtils$
Serialization stack:
- object not serializable (class: HadoopUtils$, value: HadoopUtils$@7f5bab61)
- field (class: $iw, name: HadoopUtils$module, type: class HadoopUtils$)
- object (class $iw, $iw@3f4a0d43)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@74d06d1e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f9764ea)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6821099e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4f509444)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@11462802)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@11d2d501)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@284fd700)
- field (class: $line14.$read, name: $iw, type: class $iw)
- object (class $line14.$read, $line14.$read@46b4206a)
- field (class: $iw, name: $line14$read, type: class $line14.$read)
- object (class $iw, $iw@33486894)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@25980fc9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1fb0d28d)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@42ea11d5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@42d28cc1)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@22131a73)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@631878e1)
- field (class: $line18.$read, name: $iw, type: class $iw)
- object (class $line18.$read, $line18.$read@561c52c0)
- field (class: $iw, name: $line18$read, type: class $line18.$read)
- object (class $iw, $iw@1d5b8be2)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@4de4c672)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function2>)
- element of array (index: 9)
- array (class [Ljava.lang.Object;, size 15)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 85 more
It seems the Task not serializable
issue was not related to either the HadoopUtils
class or object. Given that, in the driver program, instance of HadoopUtils
class is accessed through singleton HadoopUtils object
i.e. HadoopUtils.getHadoopUtil
, the HadoopUtils
class needs to be serialized along with MyMain
object.
The solution to the issue can be referenced here