I'm getting the following error from the getProducedType
method in my Flink deserializer:
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
Deserializer:
class AvroDeserializer[T <: GenericRecord : ClassTag](topic: String, schemaRegistryUrl: String) extends KeyedDeserializationSchema[T] {
@transient lazy val keyDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
true)
deserializer
}
// Flink needs the serializer to be serializable => this "@transient lazy val" does the trick
@transient lazy val valueDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
// other schema-registry configuration parameters can be passed, see the configure() code
// for details (among other things, schema cache size)
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)
deserializer
}
override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): T = {
valueDeserializer.deserialize(topic, message).asInstanceOf[T]
}
override def isEndOfStream(nextElement: T): Boolean = false
override def getProducedType: TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}
}
From what I've read Kryo has some issues with array types which my message does have. If this is actually true then how can I deserialize my Kafka message into a GenericRecord?
Had this problem myself earlier. This is due to the fact that the Kryo parser can't serialize avro types correctly.
To fix this, you can include the flink-avro library to your project as detailed here: Avro support in Flink
After doing this, Flink should now use a special parser for avro types automatically.
If this isn't the case, you might consider trying to set the option enableForceAvro() as described in Execution Configuration