Search code examples
scalahadoopapache-sparkhdfsavro

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to packagename.MyRecord


I am trying to use Spark 1.5.1 (with Scala 2.10.2) to read some .avro files from HDFS (with spark-avro 1.7.7) , in order to do some computation on them.

Now, starting with the assumption that I have already searched thoroughly the web to find a solution ( and the best link so far is this one that suggests to use a GenericRecord, while this one reports the same issue, and this one just does not work for me, because it gives almost the same code I have used ), I ask here, because it might be that someone had the same. This is the code :

import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} 
import org.apache.hadoop.io.NullWritable 
import org.apache.spark.{SparkConf, SparkContext}

object SparkPOC {

  def main(args: Array[String]): Unit ={

    val conf = new SparkConf()
      .setAppName("SparkPOC")
      .set("spark.master", "local[4]")
    val sc = new SparkContext(conf)
    val path = args(0)
    val profiles = sc.hadoopFile(
      path,
      classOf[AvroInputFormat[MyRecord]],
      classOf[AvroWrapper[MyRecord]],
      classOf[NullWritable]
    )

    val timeStamps = profiles.map{ p => p._1.datum.getTimeStamp().toString}
    timeStamps.foreach(print)

}

And I get the following message:

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to packagename.MyRecord
    at packagename.SparkPOC$$anonfun$1.apply(SparkPOC.scala:24)
    at packagename.SparkPOC$$anonfun$1.apply(SparkPOC.scala:24)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

Does anybody have a clue? I was also considering the possibility of using spark-avro, but they don't support reading from multiple files at the same time (while .hadoopFile supports wildcards). Otherwise, it seems that I have to go for the GenericRecord and use the .get method, losing the advantage of the coded schema (MyRecord).

Thanks in advance.


Solution

  • The problem has gone after I have set KryoSerializer and a spark.kryo.registrator class, as follows:

    val config = new SparkConf()
      .setAppName(appName)
      .set("spark.master", master)
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "com.mypackage.AvroKryoRegistrator")
    

    where AvroKryoRegistrator is something like this.