Search code examples
javaapache-kafkaavrokafka-consumer-apikafka-producer-api

Apache Kafka and Avro: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer


Whenever I am trying to read the message from kafka queue, I am getting following exception :

[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
        at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79)
        at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)

Kafka Producer Code:

public class AvroSpecificProducer {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer<String, Customer> kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord<String, Customer> record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord<String, Customer> record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: "+ recordMetaData.offset());
            System.out.println("Topic: "+ recordMetaData.topic());
            System.out.println("Partition: "+ recordMetaData.partition());
            System.out.println("Timestamp: "+ recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1002, "Jimmy");
        ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry",
                "Customer One 11 ", customer1
        );

        asyncSend(record1);

        Thread.sleep(1000);
    }
}

Kafka Consumer Code:

public class AvroSpecificDeserializer {

    private static Properties kafkaProps = new Properties();

    static {
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
        kafkaProps.put("zookeeper.connect", "localhost:2181");
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
    }

    public static void infiniteConsumer() throws IOException {
        VerifiableProperties properties = new VerifiableProperties(kafkaProps);
        KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(properties);
        KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties);

        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put("NewTopic", 1);

        ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps));
        Map<String, List<KafkaStream<Object, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

        KafkaStream stream = consumerMap.get("NewTopic").get(0);
        ConsumerIterator it = stream.iterator();

        System.out.println("???????????????????????????????????????????????? ");
        while (it.hasNext()) {
            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
            MessageAndMetadata messageAndMetadata = it.next();
            String key = (String) messageAndMetadata.key();
            GenericRecord record = (GenericRecord) messageAndMetadata.message();
            Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record);
            System.out.println("Key: " + key);
            System.out.println("Value: " + customer);
        }

    }

    public static void main(String[] args) throws IOException {
        infiniteConsumer();
    }
}

I am following, These examples:

  1. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/java/io/confluent/examples/producer/AvroClicksProducer.java
  2. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-consumer/src/main/java/io/confluent/examples/consumer/AvroClicksSessionizer.java

Solution

  • This is the final code that would work, after discussing with @harmeen

    static { 
        kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest"); 
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1"); 
        kafkaProps.put("zookeeper.connect", "localhost:2181"); 
        kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
        kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
    }
    
    public static void infiniteConsumer() throws IOException { 
    
    VerifiableProperties properties = new VerifiableProperties(kafkaProps); 
    StringDecoder keyDecoder = new StringDecoder(properties); 
    KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties); 
    
    Map<String, Integer> topicCountMap = new HashMap<>(); 
    topicCountMap.put("BrandNewTopics", 1); 
    
    ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps)); 
    Map<String, List<KafkaStream<String, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); 
    
    KafkaStream stream = consumerMap.get("BrandNewTopics").get(0); 
    ConsumerIterator it = stream.iterator(); 
    
    while (it.hasNext()) { 
        MessageAndMetadata messageAndMetadata = it.next(); 
        String key = (String) messageAndMetadata.key(); 
        GenericRecord record = (GenericRecord) messageAndMetadata.message(); 
        Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record); 
        System.out.println("Key: " + key); 
        System.out.println("Value: " + customer); 
    } 
    

    Things that got change:

    • Adding SPECIFIC_AVRO_READER_CONFIG property to true.
    • Using smallest to start from the beginning of the topic.
    • Using StringSerializer and StringDeserializer for keys.
    • Change both producer and consumer to reflect the previous change
    • Adjust the namespace for the Customer class that represents the Avro record.