Search code examples
scalaapache-sparkavro

Spark: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema


I am creating avro RDD with following code.

 def convert2Avro(data : String ,schema : Schema)  : AvroKey[GenericRecord] = {
   var wrapper = new AvroKey[GenericRecord]()
   var record = new GenericData.Record(schema)
   record.put("empname","John")
    wrapper.datum(record)
    return wrapper 
  }

and creating avro RDD as follows.

 var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schema)))

while executing, I am getting following exception in above line

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
        at org.apache.spark.rdd.RDD.map(RDD.scala:270)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

any pointer?


Solution

  • Schema.ReocrdSchema class has not implemented serializable. So it could not transferred over the network. We can convert the schema to string and pass to method and inside the method reconstruct the schema object.

    var schemaString = schema.toString
    var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schemaString)))
    

    Inside the method reconstruct the schema:

    def convert2Avro(data : String ,schemaString : String)  : AvroKey[GenericRecord] = {
       var schema = parser.parse(schemaString)
       var wrapper = new AvroKey[GenericRecord]()
       var record = new GenericData.Record(schema)
       record.put("empname","John")
        wrapper.datum(record)
        return wrapper 
      }