Search code examples
apache-kafkaapache-flinkavroflink-sql

How to deserialize Avro enum type in Flink SQL?


I have a Kafka topic with the following Avro IDL and registered to the Schema Registry.

    @namespace("my.type.avro")
    protocol MyProtocol {
      enum MyEnumType {
       Type1, Type2
      }

      record MyEntry {
         MyEnumType myEntryType = "Type1";
      }

      record MyRecord {
          MyEntry entry;
      }
    }

To read from the topic, I've defined the following DDL:

    CREATE TABLE my_table

    (
      `entry` ROW(`myEntryType` ROW(???))
     ) WITH (
         'connector' = 'kafka',
         'topic' = 'my-topic',
         'properties.bootstrap.servers' = '...:9092',
         'scan.startup.mode' = 'latest-offset',
         'value.format' = 'avro-confluent',
         'value.avro-confluent.schema-registry.url' = 'http://...:8081'
    )

And I run the following query :

    SELECT * FROM my_table

Now I got the following messages in Flink-1.13.1 when I use STRING for the type:

     *Caused by: java.io.IOException: Failed to deserialize Avro record.*
       at
     org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
       at
     org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
       at
     org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
       at
     org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
       at
     org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
       at
     org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
       at
     org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
       at
     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
       at
     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
       at
     org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
     *Caused by: org.apache.avro.AvroTypeException: Found
     my.type.avro.MyEnumType, expecting union*
       at
     org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
       at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
       at
     org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
       at
     org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:1
     ...

I've tried *RAW('class','snapshot') where 'class' is my.type.avro.MyEnumType, but i cant find a suitable snapshot serializer. Have tried bunch of them e.g. PojoSerializerSnapshot, KryoSerializer.KryoSerializerConfigSnapshot, StringSerializer, AvroSerializer etc., none of which worked.


Solution

  • True, Current current workaround is to use DataStream API to read the data and provide your custom Avro schema to configure the format. Then switch to Table API, as mentioned in following thread : https://www.mail-archive.com/user@flink.apache.org/msg44520.html

    Also, for further support enum via Table API, the following jira ticket is opened: https://issues.apache.org/jira/browse/FLINK-24544