Search code examples
javaapache-kafkaavroconfluent-platformconfluent-schema-registry

KafkaAvroDeserializer does not return SpecificRecord but returns GenericRecord


My KafkaProducer is able to use KafkaAvroSerializer to serialize objects to my topic. However, KafkaConsumer.poll() returns deserialized GenericRecord instead of my serialized class.

MyKafkaProducer

 KafkaProducer<CharSequence, MyBean> producer;
    try (InputStream props = Resources.getResource("producer.props").openStream()) {
      Properties properties = new Properties();
      properties.load(props);
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");

      MyBean bean = new MyBean();
      producer = new KafkaProducer<>(properties);
      producer.send(new ProducerRecord<>(topic, bean.getId(), bean));

My KafkaConsumer

 try (InputStream props = Resources.getResource("consumer.props").openStream()) {
      properties.load(props);
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");
      consumer = new KafkaConsumer<>(properties);
    }
    consumer.subscribe(Arrays.asList(topic));
    try {
      while (true) {
        ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
        if (records.isEmpty()) {
          continue;
        }
        for (ConsumerRecord<CharSequence, MyBean> record : records) {
          MyBean bean = record.value(); // <-------- This is throwing a cast Exception because it cannot cast GenericRecord to MyBean
          System.out.println("consumer received: " + bean);
        }
      }

MyBean bean = record.value(); That line throws a cast Exception because it cannot cast GenericRecord to MyBean.

I'm using kafka-client-0.9.0.1, kafka-avro-serializer-3.0.0.


Solution

  • KafkaAvroDeserializer supports SpecificData

    It's not enabled by default. To enable it:

    properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    

    KafkaAvroDeserializer does not support ReflectData

    Confluent's KafkaAvroDeserializer does not know how to deserialize using Avro ReflectData. I had to extend it to support Avro ReflectData:

    /**
     * Extends deserializer to support ReflectData.
     *
     * @param <V>
     *     value type
     */
    public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer {
    
      private Schema readerSchema;
      private DecoderFactory decoderFactory = DecoderFactory.get();
    
      protected ReflectKafkaAvroDeserializer(Class<V> type) {
        readerSchema = ReflectData.get().getSchema(type);
      }
    
      @Override
      protected Object deserialize(
          boolean includeSchemaAndVersion,
          String topic,
          Boolean isKey,
          byte[] payload,
          Schema readerSchemaIgnored) throws SerializationException {
    
        if (payload == null) {
          return null;
        }
    
        int schemaId = -1;
        try {
          ByteBuffer buffer = ByteBuffer.wrap(payload);
          if (buffer.get() != MAGIC_BYTE) {
            throw new SerializationException("Unknown magic byte!");
          }
    
          schemaId = buffer.getInt();
          Schema writerSchema = schemaRegistry.getByID(schemaId);
    
          int start = buffer.position() + buffer.arrayOffset();
          int length = buffer.limit() - 1 - idSize;
          DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
          BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
          return reader.read(null, decoder);
        } catch (IOException e) {
          throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
        } catch (RestClientException e) {
          throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
        }
      }
    }
    

    Define a custom deserializer class which deserializes to MyBean:

    public class MyBeanDeserializer extends ReflectKafkaAvroDeserializer<MyBean> {
      public MyBeanDeserializer() {
        super(MyBean.class);
      }
    }
    

    Configure KafkaConsumer to use the custom deserializer class:

    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyBeanDeserializer.class);