Search code examples
hadoopapache-sparkapache-spark-sqlhadoop2kryo

How to register InternalRow with Kryo in Spark


I want to run Spark with Kryo serialisation. Therefore I set spark.serializer=org.apache.spark.serializer.KryoSerializer and spark.kryo.registrationRequired=true

When I then run my code I get the error:

Class is not registered: org.apache.spark.sql.catalyst.InternalRow[]

According to this post I used

sc.getConf.registerKryoClasses(Array( classOf[ org.apache.spark.sql.catalyst.InternalRow[_] ] ))

But then the error is:

org.apache.spark.sql.catalyst.InternalRow does not take type parameters


Solution

  • you should use an external class as

    class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[Array[org.apache.spark.sql.catalyst.InternalRow]])
    }
    }
    

    source : http://spark.apache.org/docs/0.6.0/tuning.html

    Or if you want to register in your spark class

    val cls: Class[Array[InternalRow]] = classOf[Array[org.apache.spark.sql.catalyst.InternalRow]]
    
    spark.sparkContext.getConf.registerKryoClasses(Array(cls))
    

    I use the first one and works perfectly, I haven't tested the second one.