Search code examples
scalaapache-kafkaprotocol-buffersapache-flinkconfluent-schema-registry

Deserialize Protobuf kafka messages with Flink


I am trying to read and print Protobuf message from Kafka using Apache Flink.

I followed the official docs with no success: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/

The Flink consumer code is:

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    env.enableCheckpointing(5000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
    env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])

    val source = KafkaSource.builder[User]
      .setBootstrapServers(brokers)
      .setTopics(topic)
      .setGroupId(consumerGroupId)
      .setValueOnlyDeserializer(new ProtoDeserializer())
      .setStartingOffsets(OffsetsInitializer.earliest)
      .build

    val stream = env.fromSource(source, WatermarkStrategy.noWatermarks[User], kafkaTableName)
    stream.print()
    env.execute()
  }

The deserializer code is:

class ProtoDeserializer extends DeserializationSchema[User] {

  override def getProducedType: TypeInformation[User] = null

  override def deserialize(message: Array[Byte]): User = User.parseFrom(message)

  override def isEndOfStream(nextElement: User): Boolean = false
}

I get the following error when the streamer is executed:

Protocol message contained an invalid tag (zero).

It's important to mention that I manage to read and deserialize the messages successfully using the confluent protobuf consumer so it seems that the messages are not corrupted.


Solution

  • The confluent protobuf serializer doesn't produce content that can be directly deserialized by other deserializers. The format is described in confluent's documentation: it starts with a magic byte (that is always zero), followed by a four byte schema ID. The protobuf payload follows, starting with byte 5.

    The getProducedType method should return appropriate TypeInformation, in this case TypeInformation.of(User.class). Without this you may run into problems at runtime.

    Deserializers used with KafkaSource don't need to implement isEndOfStream, but it won't hurt anything.