Search code examples
javaserializationdeserializationapache-kafkaavro

KafkaAvroDeserializer Issue


I am trying to desrialize a Kafka Avro message using Avro Kafka Deserializer. This code is very common and already in practice by lot of users. But I am facing few difficulties in implementing the same: Code:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"

KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));

System.out.println("Reading topic:" + topic);

while (true) {
        @SuppressWarnings("unchecked")
        ConsumerRecords<String, ATPEvent> records = (ConsumerRecords<String, ATPEvent>) consumer.poll(1000);  //1

       for (ConsumerRecord<String, ATPEvent> record: records)  //2 {

            try {
                kafkaMessageInputStream = new ByteBufferInputStream(Lists.newArrayList(ByteBuffer.wrap(record.value()))); //3
                avroBinaryDecoder = avroDecoderFactory.binaryDecoder(kafkaMessageInputStream, avroBinaryDecoder);
                avroEvent = reader.read(avroEvent, avroBinaryDecoder);
                System.out.println(avroEvent);
                kafkaMessageInputStream.close();
              } catch (Exception ex) {
                System.out.println("Unable to process event from kafka, see exception details" + ex);
              }
        }
        consumer.commitSync(); //4
}

Now, here are the 4 issue:

  1. I have to add cast, otherwise it will through an error as Type mismatch: cannot convert from Map<String,ConsumerRecords<String,ATPEvent>> to ConsumerRecords<String,ATPEvent>

  2. Can only iterate over an array or an instance of java.lang.Iterable I have no clue why this? Can I do this? :

    List<ConsumerRecord<String, ATPEvent>> records = (List<ConsumerRecord<String, ATPEvent>>) consumer.poll(1000);
    
    for (ConsumerRecord<String, ATPEvent> record: records) {
    
  3. The method wrap(byte[]) in the type ByteBuffer is not applicable for the arguments (ATPEvent) This I understand, but how can I convert a class to a byte, is there any other way?

  4. The method commitSync() is undefined for the type KafkaConsumer<String,ATPEvent> Can I just use consumer.close();

Please provide solution for 2 & 3, and if possible explanation for 1 & 4.


Solution

  • Which version of Kafka do you use? There are differences for 0.8.x and 0.9.x

    Kafka 0.8.x:

    1. return type is Map<String, ConsumerRecords> (see http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/KafkaConsumer.html)
    2. use ConsumerRecords#records(...) to get List<ConsumerRecord> for iteration (see http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/ConsumerRecords.html)
    3. ConsumerRecord.value() return byte[] (see http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/ConsumerRecord.html#value%28%29)
    4. Use commit(boolen) (see http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commit%28boolean%29) -- commitSync() only available in 0.9.x

    Kafka 0.9.x

    1. return type is ConsumerRecords (see https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
    2. ConsumerRecords implements Iterable (see https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) Thus, you can use for(ConsumerRecord r : records)
    3. ConsumerRecord#value returns T (with T == ATPEvent in your case) (see https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
    4. commitSync() available in 0.9.x