Search code examples
scalaapache-flinkavro

AvroTypeException: Not an enum: MOBILE on DataFileWriter


I am getting the following error message when I tried to write avro records using build-in AvroKeyValueSinkWriter in Flink 1.3.2 and avro 1.8.2:

My schema looks like this:

{"namespace": "com.base.avro",
 "type": "record",
 "name": "Customer",
 "doc": "v6",
 "fields": [
     {"name": "CustomerID", "type": "string"},
     {"name": "platformAgent", "type": {
       "type": "enum",
       "name": "PlatformAgent",
       "symbols": ["WEB", "MOBILE", "UNKNOWN"]
       }, "default":"UNKNOWN"}
 ]
}

And I am calling the following Flink code to write data:

    var properties = new util.HashMap[String, String]()
    val stringSchema = Schema.create(Type.STRING)
    val myTypeSchema = Customer.getClassSchema
    val keySchema = stringSchema.toString
    val valueSchema = myTypeSchema.toString

    val compress = true
    properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema)
    properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema)
    properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString)
    properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC)

    val sink = new BucketingSink[org.apache.flink.api.java.tuple.Tuple2[String, Customer]]("s3://test/flink")
    sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/mm/"))
    sink.setInactiveBucketThreshold(120000) // this is 2 minutes
    sink.setBatchSize(1024 * 1024 * 64) // this is 64 MB,
    sink.setPendingSuffix(".avro")

    val writer = new AvroKeyValueSinkWriter[String, Customer](properties)
    sink.setWriter(writer.duplicate())

However, it throws the following errors:

Caused by: org.apache.avro.AvroTypeException: Not an enum: MOBILE
    at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:177)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:119)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:302)
    ... 10 more

Please suggest!

UPDATE 1: I found this is kind of bug in avro 1.8+ based on this ticket: https://issues-test.apache.org/jira/browse/AVRO-1810


Solution

  • It turns out this is an issue with Avro 1.8+, I have to override the version flink uses dependencyOverrides += "org.apache.avro" % "avro" % "1.7.3", the bug can be found here https://issues-test.apache.org/jira/browse/AVRO-1810