import spark.implicits._ val ds1 = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("startingOffsets", "latest") .load() .as[KafkaMessage] .select($"value".as[Array[Byte]]) .map(msg=>{
val byteArrayInputStream = new ByteArrayInputStream(msg)
val datumReader:DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](messageSchema)
val dataFileReader:DataFileStream[GenericRecord] = new DataFileStream[GenericRecord](byteArrayInputStream, datumReader)
while(dataFileReader.hasNext) {
val userData1: GenericRecord = dataFileReader.next()
userData1.asInstanceOf[org.apache.avro.util.Utf8].toString
}
})
Error: Error:(49, 9) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. .map(msg=>{
Whenever you are trying to to do map/transformations on dataset in structured streaming, it is expected to associate with an appropriate encoder.
Tor primitive data types, implicit encoders are provided by spark:
import spark.implicits._
Where as for other types you need to provide manually.
So here you can either use implicit encoders:
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)
... or you can define your encoder which is associated with the data being processed within map function.