Search code examples
spring-bootapache-kafkaavroconfluent-schema-registrydebezium

Avro class cannot be cast to class org.springframework.messaging.Message


I am consuming Avro data coming from Debezium

I made the kafka consumer as follows:

  1. The Java POJO

    import lombok.Data;
    
    @Data
    public class Shop {
    
     Long shopId;
     Double latitude, longitude;
     String name;
     String phoneNumber;
     String placeId;
     double rating;
     String website;
     int addressId;
     String closingHours;
     String email;
     int maxAttendance;
     String opening_hours;
     String businessHours;
     String closeDay;
     String description;
     boolean open;
     String setWeekendBusinessHours;
     Long userShopId;
    }
    
  2. Avro Message Format

     {
         "type": "record",
         "name": "ShopMessage",
         "namespace": "com.example.kafka.avro",
         "fields": [
         {
          "name": "SHOP_ID",
          "type": [
            "null",
            "long"
          ],
          "default": null
         },
         {
          "name": "LATITUDE",
          "type": [
            "null",
            "double"
          ],
          "default": null
         },
         {
          "name": "LONGITUDE",
          "type": [
            "null",
            "double"
          ],
          "default": null
         },
         {
          "name": "NAME",
          "type": [
            "null",
            "string"
          ],
          "default": null
         },
         {
          "name": "PHONENUMBER",
          "type": [
            "null",
            "string"
          ],
          "default": null
         },
         {
          "name": "PLACEID",
          "type": [
            "null",
            "string"
          ],
          "default": null
         },
         {
          "name": "RATING",
          "type": [
            "null",
            "double"
          ],
          "default": null
         },
         {
          "name": "WEBSITE",
          "type": [
            "null",
            "string"
          ],
          "default": null
         },
         {
          "name": "ADDRESSID",
          "type": [
            "null",
            "int"
          ],
          "default": null
         },
         {
          "name": "CLOSINGHOUR",
          "type": [
            "null",
            "string"
          ],
          "default": null
         },
         {
          "name": "EMAIL",
          "type": [
            "null",
            "string"
          ],
          "default": null
         },
         {
          "name": "MAXATTENDANCE",
          "type": [
            "null",
            "int"
          ],
          "default": null
         },
         {
          "name": "OPENINGHOURS",
          "type": [
            "null",
            "string"
          ],
          "default": null
         },
         {
          "name": "BUSINESSHOURS",
          "type": [
            "null",
            "string"
          ],
          "default": null
         },
         {
          "name": "CLOSEDAY",
          "type": [
            "null",
            "string"
          ],
          "default": null
         },
         {
          "name": "DESCRIPTION",
          "type": [
            "null",
            "string"
          ],
          "default": null
         },
         {
          "name": "ISOPEN",
          "type": [
            "null",
            "boolean"
          ],
          "default": null
         },
         {
          "name": "WEEKENDBUSINESSHOURS",
          "type": [
            "null",
            "string"
          ],
          "default": null
         },
         {
          "name": "USERSHOPID",
          "type": [
            "null",
            "long"
          ],
          "default": null
         }
        ]
       }
  1. ShopConsumer
@Component
public class ShopConsumer {

    private final ShopMapper shopMapper;
    private final Logger log = LogManager.getLogger(ShopConsumer.class);

    public ShopConsumer(ShopMapper shopMapper) {
        this.shopMapper = shopMapper;
    }

    @KafkaListener(
            groupId = "${spring.kafka.consumer.group-id}",
            topics = "${spring.kafka.consumer.topic}"
    )
    public void listen(List<Message<ShopMessage>> messages, Acknowledgment ack){
        log.info("Received batch of messages with size: {}", messages.size());
        List<Shop> shops = messages.stream()
                .peek(this::logMessageReceived)
                .map(message -> shopMapper.toChange(message.getPayload()))
                .collect(Collectors.toList());

        //do remove redis cache
        ack.acknowledge();
    }

    private void logMessageReceived(Message<ShopMessage> message) {
        log.info("Received shopId {} with a name of '{} and place id {}', partition={}, offset={}",
                message.getPayload().getSHOPID(),
                message.getPayload().getNAME(),
                message.getPayload().getPLACEID(),
                message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
                message.getHeaders().get(KafkaHeaders.OFFSET));
    }

  1. Consumer Config - ShopConsumerConfig.java
@EnableKafka
@Configuration
public class ShopsConsumerConfig {

    private final KafkaProperties kafkaProperties;

    public ShopsConsumerConfig(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ShopMessage> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ShopMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(false);
        factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
        factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, ShopMessage> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroWithSchemaDeserializer.class);

        props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getProperties().get("schema-registry-url"));
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        props.put(SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, ShopMessage.class);
        return props;
    }

}

  1. Schema Deserializer
public class SpecificAvroWithSchemaDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {

    public static final String AVRO_KEY_RECORD_TYPE = "avro.key.record.type";
    public static final String AVRO_VALUE_RECORD_TYPE = "avro.value.record.type";

    private Schema readerSchema;

    public SpecificAvroWithSchemaDeserializer() { }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        this.configure(new KafkaAvroDeserializerConfig(configs));
        readerSchema = getSchema(getRecordClass(configs, isKey));
    }

    private Class<?> getRecordClass(Map<String, ?> configs, boolean isKey) {
        String configsKey = isKey ? AVRO_KEY_RECORD_TYPE : AVRO_VALUE_RECORD_TYPE;
        Object configsValue = configs.get(configsKey);

        if (configsValue instanceof Class) {
            return (Class<?>) configsValue;
        } else if (configsValue instanceof String) {
            String recordClassName = (String) configsValue;
            try {
                return Class.forName(recordClassName);
            } catch (ClassNotFoundException e) {
                throw new IllegalArgumentException(String.format("Unable to find the class '%s'", recordClassName));
            }
        } else {
            throw new IllegalArgumentException(
                    String.format("A class or a string must be informed into ConsumerConfig properties: '%s' and/or '%s'",
                            AVRO_KEY_RECORD_TYPE, AVRO_VALUE_RECORD_TYPE));
        }
    }

    private Schema getSchema(Class<?> targetType) {
        try {
            Field field = targetType.getDeclaredField("SCHEMA$");
            return (Schema) field.get(null);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            throw new IllegalArgumentException(
                    String.format("Unable to get Avro Schema from the class '%s'", targetType.getName()), e);
        }
    }

    @Override
    public Object deserialize(String topic, byte[] bytes) {
        return super.deserialize(bytes, readerSchema);
    }

    @Override
    public void close() {
    }


}
  1. Mapper Class
@Mapper(componentModel = "spring")
public interface ShopMapper {

    default Shop toChange(ShopMessage shopMessage){
        if(shopMessage == null){
            return null;
        }
        Shop shop = new Shop();
        shop.setDescription(shopMessage.getDESCRIPTION().toString());
        shop.setMaxAttendance(shopMessage.getMAXATTENDANCE());
        shop.setSetWeekendBusinessHours(shopMessage.getWEEKENDBUSINESSHOURS().toString());
        shop.setOpen(shopMessage.getISOPEN());
        shop.setWebsite(shopMessage.getWEBSITE().toString());
        shop.setRating(shopMessage.getRATING());
        shop.setLatitude(shopMessage.getLATITUDE());
        shop.setLongitude(shopMessage.getLONGITUDE());
        shop.setCloseDay(shopMessage.getCLOSEDAY().toString());
        shop.setBusinessHours(shopMessage.getBUSINESSHOURS().toString());
        shop.setPhoneNumber(shopMessage.getPHONENUMBER().toString());
        shop.setEmail(shopMessage.getEMAIL().toString());
        shop.setPlaceId(shopMessage.getPLACEID().toString());

        return shop;
    }

}

Configuration is present on the application.properties file but during message consumption, Spring throws me an error of

Caused by: java.lang.ClassCastException: class com.example.kafka.avro.ShopMessage cannot be cast to class org.springframework.messaging.Message (com.example.kafka.avro.ShopMessage and org.springframework.messaging.Message are in unnamed module of loader 'app')

Could someone give me a correct direction to fix this issue, please? Looks like casting from POJO from Avro is having the issue but I am not able to find the root.

Thanks in advance.


Update

After few attempt, it looks that the issue on the above error is due to casting from a single message to list of messages. I changed the listener function as below.

public void listen(ConsumerRecord<Integer ,?> messages, Acknowledgment ack){
        //log.info("Received batch of messages with size: {}", messages.size());
        log.info(messages.key());
        log.info(messages.value());
  
        ack.acknowledge();
    }

and getting a value from Kafka topic.

{"before": {"id": 6, "latitude": 2.921318, "longitude": 101.655938, "name": "XYZ", "phone_number": "+12345678", "place_id": "P007", "rating": 5.0, "type": "Food", "website": "https://xyz.me", "address_id": 5, "closing_hours": null, "email": "[email protected]", "max_attendance": 11, "opening_hours": null, "business_hours": "09-18", "close_day": "Saturday", "description": "Some Dummy", "is_open": true, "weekend_business_hours": "08-12", "user_shop_id": 0}, "after": {"id": 6, "latitude": 2.921318, "longitude": 101.655938, "name": "XYZ - edited", "phone_number": "+12345678", "place_id": "P007", "rating": 5.0, "type": "Food 2", "website": "https://xyz.me", "address_id": 5, "closing_hours": null, "email": "[email protected]", "max_attendance": 11, "opening_hours": null, "business_hours": "09-18", "close_day": "Saturday", "description": "Some dummy", "is_open": true, "weekend_business_hours": "08-12", "user_shop_id": 0}, "source": {"version": "1.6.0.Final", "connector": "mysql", "name": "bookingdev_sqip_local", "ts_ms": 1629267837000, "snapshot": "false", "db": "booking", "sequence": null, "table": "shop", "server_id": 1, "gtid": null, "file": "mysql-bin.000044", "pos": 26432, "row": 0, "thread": null, "query": null}, "op": "u", "ts_ms": 1629267836453, "transaction": null}

apart of that, I also removed the custom deserializer and custom POJO as schema is already installed on the schema registry.

Now the question remains, how do I obtain debezium's schema generated from schema-registry and convert the message to the correct Java POJO to be executed further?


Update 19.08.2021

After discussion with @OneCricketeer, I made adjustment on the logic for Consumer as below

 public void listen(ConsumerRecord<Integer, GenericRecord> messages, Acknowledgment ack) throws JsonProcessingException {
       
        log.info(messages.key());
        log.info(messages.value());

      

        Shop shop = new ObjectMapper().readValue(messages.value().get("after").toString(), Shop.class);
        log.info("NEW VALUE #####-> " + shop.getName());
        
        //other logic here.

        ack.acknowledge();
    }

But I got another error:

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.0.jar:2.7.0]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition bookingdev_sqip_local.booking.shop-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class bookingdev_sqip_local.booking.shop.Key specified in writer's schema whilst finding reader's schema for a SpecificRecord.

Checked the Schema-Registry Debezium created two subjects - one for key and one for value.

["bookingdev_sqip_local.booking.shop-value","bookingdev_sqip_local.booking.shop-key"]

Looks like the error due to unable to map the schema for the key.


Solution

  • Ok after struggling with this Spring Boot <-> Kafka Connector <-> Debezium CDC MySQL. I got a working application.

    The architecture:

    MySQL(Producer) <-> Debezium CDC Kafka Connect <-> Kafka <-> SpringBoot (Consumer) 
    

    I am using Schema-Registry to store the schema configuration.

    ShopConsumerConfig.java

    @EnableKafka
    @Configuration
    public class ShopsConsumerConfig {
    
        private final KafkaProperties kafkaProperties;
    
        public ShopsConsumerConfig(KafkaProperties kafkaProperties) {
            this.kafkaProperties = kafkaProperties;
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, Shop> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, Shop> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setBatchListener(false);
            factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
            factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());
            return factory;
        }
    
        @Bean
        public ConsumerFactory<String, Shop> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = kafkaProperties.buildConsumerProperties();
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    
            props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getProperties().get("schema-registry-url"));
            props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); //this thing is nasty do not turn to true for this!
            props.put(KafkaAvroDeserializerConfig.USE_LATEST_VERSION, true);
            return props;
        }
    
    }
    

    Shop.java or the Java POJO

    import lombok.Data;
    
    @Data
    public class Shop {
    
        Long id;
        Double latitude, longitude;
        String name;
        String phone_number;
        String place_id;
        String type;
        double rating;
        String website;
        int address_id;
        String closing_hours;
        String email;
        int max_attendance;
        String opening_hours;
        String business_hours;
        String close_day;
        String description;
        String is_open;
        String weekend_business_hours;
        Long user_shop_id;
    }
    
    

    and finally the consumer ShopConsumer.java

    @Component
    public class ShopConsumer {
    
        private final Logger log = LogManager.getLogger(ShopConsumer.class);
    
        @KafkaListener(
                groupId = "${spring.kafka.consumer.group-id}",
                topics = "${spring.kafka.consumer.topic}"
        )
        public void listen(ConsumerRecord<?, GenericRecord> messages, Acknowledgment ack) throws JsonProcessingException {
    
            //debugging purposes only TODO remove me
            log.info(messages.key());
            log.info(messages.value());
            log.info(messages.value().getSchema().getField("after"));
    
            //convert the message, obtain the "after" section to get the newly updated value and parse it to Java POJO (in this case Shop.java)
            Shop shop = new ObjectMapper().readValue(messages.value().get("after").toString(), Shop.class);
    
            //debugging purposes only.
            log.info("NEW VALUE #####-> " + shop.getName());
    
            //other logic goes here...
    
            ack.acknowledge();
        }
    
    }
    

    I hope this helps anyone out there who is struggling to understand how to consume the Debezium message.