I am trying to desrialize a Kafka Avro message using Avro Kafka Deserializer. This code is very common and already in practice by lot of users. But I am facing few difficulties in implementing the same: Code:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"
KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Reading topic:" + topic);
while (true) {
@SuppressWarnings("unchecked")
ConsumerRecords<String, ATPEvent> records = (ConsumerRecords<String, ATPEvent>) consumer.poll(1000); //1
for (ConsumerRecord<String, ATPEvent> record: records) //2 {
try {
kafkaMessageInputStream = new ByteBufferInputStream(Lists.newArrayList(ByteBuffer.wrap(record.value()))); //3
avroBinaryDecoder = avroDecoderFactory.binaryDecoder(kafkaMessageInputStream, avroBinaryDecoder);
avroEvent = reader.read(avroEvent, avroBinaryDecoder);
System.out.println(avroEvent);
kafkaMessageInputStream.close();
} catch (Exception ex) {
System.out.println("Unable to process event from kafka, see exception details" + ex);
}
}
consumer.commitSync(); //4
}
Now, here are the 4 issue:
I have to add cast, otherwise it will through an error as
Type mismatch: cannot convert from Map<String,ConsumerRecords<String,ATPEvent>> to ConsumerRecords<String,ATPEvent>
Can only iterate over an array or an instance of java.lang.Iterable
I have no clue why this? Can I do this? :
List<ConsumerRecord<String, ATPEvent>> records = (List<ConsumerRecord<String, ATPEvent>>) consumer.poll(1000);
for (ConsumerRecord<String, ATPEvent> record: records) {
The method wrap(byte[]) in the type ByteBuffer is not applicable for the arguments (ATPEvent)
This I understand, but how can I convert a class to a byte, is there any other way?
The method commitSync() is undefined for the type KafkaConsumer<String,ATPEvent>
Can I just use consumer.close();
Please provide solution for 2 & 3, and if possible explanation for 1 & 4.
Which version of Kafka do you use? There are differences for 0.8.x and 0.9.x
Kafka 0.8.x:
Map<String, ConsumerRecords>
(see http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/KafkaConsumer.html)ConsumerRecords#records(...)
to get List<ConsumerRecord>
for iteration (see http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/ConsumerRecords.html)ConsumerRecord.value()
return byte[]
(see http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/ConsumerRecord.html#value%28%29)commit(boolen)
(see http://www.webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commit%28boolean%29) -- commitSync()
only available in 0.9.xKafka 0.9.x
ConsumerRecords
(see https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)ConsumerRecords
implements Iterable
(see https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) Thus, you can use for(ConsumerRecord r : records)
ConsumerRecord#value
returns T
(with T == ATPEvent
in your case) (see https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)commitSync()
available in 0.9.x