Search code examples
apache-kafkaspark-streamingavro

spark dataset encoder for kafka avro decoder message


import spark.implicits._ val ds1 = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("startingOffsets", "latest") .load() .as[KafkaMessage] .select($"value".as[Array[Byte]]) .map(msg=>{

  val byteArrayInputStream = new ByteArrayInputStream(msg)
  val datumReader:DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](messageSchema)
  val dataFileReader:DataFileStream[GenericRecord]  = new DataFileStream[GenericRecord](byteArrayInputStream, datumReader)

  while(dataFileReader.hasNext) {
    val userData1: GenericRecord = dataFileReader.next()

    userData1.asInstanceOf[org.apache.avro.util.Utf8].toString
  }
})

Error: Error:(49, 9) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. .map(msg=>{


Solution

  • Whenever you are trying to to do map/transformations on dataset in structured streaming, it is expected to associate with an appropriate encoder.

    Tor primitive data types, implicit encoders are provided by spark:

    import spark.implicits._
    

    Where as for other types you need to provide manually.

    So here you can either use implicit encoders:

    import scala.reflect.ClassTag
    
    implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)
    

    ... or you can define your encoder which is associated with the data being processed within map function.