Search code examples
scalaapache-sparkxml-deserialization

Scala: Xtream complains object not serializable


I have the following case classes defined and i would like to print out ClientData in xml format using xstream.

case class Address(addressLine1: String,
                   addressLine2: String,
                   city: String,
                   provinceCode: String,
                   country: String,
                   addressTypeDesc: String) extends Serializable{

}

case class ClientData(title: String,
                      firstName: String,
                      lastName: String,
                      addrList:Option[List[Address]]) extends Serializable{

}


object ex1{
    def main(args: Array[String]){
    ...
    ...
    ...

    // In below, x is Try[ClientData]
    val xstream = new XStream(new DomDriver)
newClientRecord.foreach(x=> if (x.isSuccess) println(xstream.toXML(x.get)))

    }
}

And when the program execute the line to print each ClientData in xml format, I am getting the runtime error below. Please help.

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
    at lab9$.main(lab9.scala:63)
    at lab9.main(lab9.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: com.thoughtworks.xstream.XStream
Serialization stack:
    - object not serializable (class: com.thoughtworks.xstream.XStream, value: com.thoughtworks.xstream.XStream@51e94b7d)
    - field (class: lab9$$anonfun$main$1, name: xstream$1, type: class com.thoughtworks.xstream.XStream)
    - object (class lab9$$anonfun$main$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 16 more

Solution

  • It isn't XStream which complains, it's Spark. You need to define xstream variable inside the task:

    newClientRecord.foreach { x=> 
      if (x.isSuccess) {
        val xstream = new XStream(new DomDriver)
        println(xstream.toXML(x.get)) 
      }
    }
    

    if XStream is sufficiently cheap to create;

    newClientRecord.foreachPartition { xs => 
      val xstream = new XStream(new DomDriver)
      xs.foreach { x =>
        if (x.isSuccess) {
          println(xstream.toXML(x.get)) 
        }
      }
    }
    

    otherwise.