Search code examples
scaladeserializationapache-flinkavro

Generic Avro Deserializer for Flink : override getProducedType


I would like to create a Generic Avro Deserializer and use it with Kafka/Flink.

To do that I have to extend DeserializationSchema from Flink API :

import java.io.ByteArrayInputStream

import com.sksamuel.avro4s.{AvroInputStream, FromRecord, SchemaFor, ToRecord}
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor

class MyGenericAvroDeserializer[T](implicit schema: SchemaFor[T], toRecord: ToRecord[T], fromRecord: FromRecord[T])
  extends DeserializationSchema[T] {

  override def isEndOfStream(nextElement: T): Boolean = false

  override def deserialize(message: Array[Byte]): T = {
    AvroInputStream.binary[T](new ByteArrayInputStream(message)).iterator.toSeq.head
  }

  override def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(classOf[T])
}

Doing this cause a problem at compilation because T do not seem to be a class :

class type required but T found
override def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(classOf[T])

Solution

  • I answer my own question. I have to use ClassTag force type with asInstanceOf but it works now :

    import java.io.ByteArrayInputStream
    
    import com.sksamuel.avro4s.{AvroInputStream, FromRecord, SchemaFor, ToRecord}
    import org.apache.flink.api.common.serialization.DeserializationSchema
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import org.apache.flink.api.java.typeutils.TypeExtractor
    
    import scala.reflect.ClassTag
    import scala.reflect._
    
    class MyGenericAvroDeserializer[T: ClassTag](implicit schema: SchemaFor[T], toRecord: ToRecord[T], fromRecord: FromRecord[T])
      extends DeserializationSchema[T] {
    
      override def isEndOfStream(nextElement: T): Boolean = false
    
      override def deserialize(message: Array[Byte]): T = {
        AvroInputStream.binary[T](new ByteArrayInputStream(message)).iterator.toSeq.head
      }
    
      override def getProducedType: TypeInformation[T] =
        TypeExtractor.getForClass(classTag[T].runtimeClass).asInstanceOf[TypeInformation[T]]
    
    }