Search code examples
javaapache-kafkaavroclasscastexceptionconfluent-schema-registry

Kafka : ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class


I have this exception in the consumer when trying to cast the record.value() into java object :

ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast  to class [...].PublicActivityRecord (org.apache.avro.generic.GenericData$Record and [...].PublicActivityRecord are in unnamed module of loader 'app')

The producer sends the java object, which is a user defined type named PublicActivityRecord, like this :

KafkaProducer<String, PublicActivityRecord> producer = new KafkaProducer<>(createKafkaProperties());

[...]

    this.producer.send(new ProducerRecord<String, PublicActivityRecord>(myTopic, activityRecord));
    this.producer.flush();

At this point I can see in debug mode that the value of the ProducerRecord is indeed of type PublicActivityRecord.

On the registry server I can see in the log the POST request of the producer sending the schema :

Registering new schema: subject DEV-INF_9325_activityRecord_01-value, version null, id null, type null, schema size 7294 (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:262)
[2022-01-28 07:01:35,575] INFO 192.168.36.30 - - [28/janv./2022:06:01:34 +0000] "POST /subjects/DEV-INF_9325_activityRecord_01-value/versions HTTP/1.1" 200 8 "-" "Java/11.0.2" POSTsT (io.confluent.rest-utils.requests:62)

On the consumer side :

protected KafkaConsumer<String, PublicActivityRecord> consumer;

[...]

consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Stream.of(kafkaConfig.getTopicActivityRecord()).collect(Collectors.toList()));
final ConsumerRecords<String, PublicActivityRecord> records = consumer.poll(duration);
records.forEach(record -> {

    [...]

    PublicActivityRecord activityRecord = record.value();

Here the ClassCastException occurs.

In debug mode, I can see that the record.value is indeed of type GenericData$Record. And it can not be cast to PublicActivityRecord.

The serializer/deserilizer keys and values are the same :

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

And in the schema-registry log, I can see the GET request of the consumer :

"GET /schemas/ids/3?fetchMaxId=false HTTP/1.1" 200 8447 "-" "Java/11.0.7" GETsT (io.confluent.rest-utils.requests:62)

So I have checked that :

  1. the producer sends a message with my own type PublicActivityRecord
  2. the message is received in the kafka broker
  3. the producer posts the schema to the schema registry
  4. the message is taken by the consumer
  5. the schema is GET by the consumer from the schema registry
  6. the value of the message is of the unexpected GenericData$Record

This leads me to the result that what is wrong is in my consumer.

So the question is : Why do the consumer get a GenericData record instead of the expected PublicActivityRecord ?

Any clue would be much appreciated !


Solution

  • By default, only generic records are returned. You'll need to set

    value.deserializer.specific.avro.reader=true 
    

    Or, use the constant in your consumer configs

    KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG = true