Search code examples
scalaapache-sparkrddserializable

NotSerializableException in Spark


Most of the non-serializable issues online get very basic data as an input for their sc.parallelize() and in the map section they encounter the non-serializable issue, but mine is a type. I have a specific data type, which is coming from a third party library and is not serializable. So writing this showed NotSerializableException :

val data: RDD[ThirdPartyLib.models.XData] = sc.parallelize(ThirdPartyLib.getX)

data.foreachPartition(rows => {
  rows.foreach(row => {
    println("value: " + row.getValue)    
  })  
})

As a solution, I created the the same model class(XData) internally but made it serializable and did this:

val data: RDD[XData] = (sc.parallelize(ThirdPartyLib.getX)).asInstanceOf[RDD[XData]]

data.foreachPartition(rows => {
  rows.foreach(row => {
    println("value: " + row.getValue)    
  })  
})

I was expecting the issue to be resolved, but I am still getting the same error that the [ERROR] org.apache.spark.util.Utils logError - Exception encountered java.io.NotSerializableException: ThirdPartyLib.models.XData. Shouldn't the problem re resolved when I created that internal serializable type? How can I fix this?


Solution

  • So with

    (sc.parallelize(ThirdPartyLib.getX)).asInstanceOf[RDD[XData]]
    

    you parallelize first and then cast. so spark still requires ThirdPartyLib.models.XData to be serializable. Also that cast would probably explode because the types aren't the same.

    I think this should do the trick

    def convertThirdPartyXDataToMyXData( xd: ThirdPartyLib.models.XData): XData = ???
    
    val data: RDD[ThirdPartyLib.models.XData] = sc.parallelize(ThirdPartyLib.getX.map(convertThirdPartyXDataToMyXData)) //if you have a map on the collection that getX returns