Search code examples
apache-kafkaapache-flinkavroflink-streaming

How to use FlinkKafkaConsumer to parse key separately <K,V> instead of <T>


From what I can tell, with Flink's AVRO deserialization, you can create a stream of Avro-objects and that's fine, but there seems to be an issue, where Flink's kafka consumer only creates streams of single object: FlinkKafkaConsumerBase<T> as opposed to your default Kafka API with its KafkaConsumer.

In my case both Key and Value are separate AVRO-schema-compliant objects and merging their schemas might be a nightmare...

Additionally it seems that with Flink API I can't retrieve ConsumerRecord information?...


Solution

  • Based on the Flink Kafka Consumer, there is a constructor:

    public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
        this(Collections.singletonList(topic), deserializer, props);
    }
    

    The second parameter - KeyedDeserializationSchema is used to deserialise Kafka record. It includes message key, message value, offset, topic, etc. So you can implement your own type named MyKafkaRecord as T with Avro key and Avro value in it. Then pass MyKafkaRecord as T to your implementation of KeyedDeserializationSchema. Refer to TypeInformationKeyValueSerializationSchema as an example.

    E.g. Reading extra info from Kafka:

    class KafkaRecord<K, V> {
      private K key;
      private V value;
      private long offset;
      private int partition;
      private String topic;
    
      ...
    }
    
    class MySchema<K, V> implements KeyedDeserializationSchema<KafkaRecord<K, V>> {
      KafkaRecord<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
        KafkaRecord<K, V> rec = new KafkaRecord<>();
        rec.key = KEY_DESERIaLISER.deserialize(messageKey);
        rec.value = ...;
        rec.topic = topic;
        ...
      }
    }