Search code examples
javastreamapache-kafkaapache-flinkavro

Apache Flink read Avro byte[] from Kafka


In reviewing examples I see alot of this:

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

I see that they here already know the schema.

I do not know the schema until I read the byte[] into a Generic Record then get the schema. (As it may change from record to record)

Can someone point me into a FlinkKafkaConsumer08 that reads from byte[] into a map filter so that I can remove some leading bits, then load that byte[] into a Generic Record ?


Solution

  • I'm doing something similar (I'm using the 09 consumer)

    In your main code pass in your custom deserializer:

    FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
                    parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
                    parameterTool.getProperties());
    

    The custom Deserialization Schema reads the bytes, figures out the schema and/or retrieves it from a schema registry, deserializes into a GenericRecord and returns the GenericRecord object.

    public class MyDeserializationSchema<T> implements DeserializationSchema<T> {
    
    
        private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;
    
        @Override
        public T deserialize(byte[] arg0) throws IOException {
            //do your stuff here, strip off your bytes
            //deserialize and create your GenericRecord 
            return (T) (myavroevent);
        }
    
        @Override
        public boolean isEndOfStream(T nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<T> getProducedType() {
            return TypeExtractor.getForClass(avrotype);
        }
    
    }