Search code examples
springapache-kafkaspring-kafka

org.springframework.kafka.support.serializer.ErrorHandlingDeserializ' of type serializer.ErrorHandlingDeserializer] while setting constructor argument


By taking a reference of : How to handle the exception at consumer ion Spring?, I am trying to convert Java bean dependency in XML bean and facing below error.

What might be wrong here?

Error:

Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'cf' defined in class path resource [context.xml]: Cannot resolve reference to bean 'valueDeserializer' while setting constructor argument
    at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:377)
    at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:135)
    at org.springframework.beans.factory.support.ConstructorResolver.resolveConstructorArguments(ConstructorResolver.java:681)
    at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:202)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1350)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1187)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:558)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:518)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973)
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:952)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:615)
    at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:144)
    at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:85)
    at com.example.kafka.MyMainApp.main(MyMainApp.java:15)
Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'valueDeserializer' defined in class path resource [context.xml]: Unsatisfied dependency expressed through constructor parameter 0: Could not convert argument value of type [java.lang.String] to required type [org.apache.kafka.common.serialization.Deserializer]: Failed to convert value of type 'java.lang.String' to required type 'org.apache.kafka.common.serialization.Deserializer'; Cannot convert value of type 'java.lang.String' to required type 'org.apache.kafka.common.serialization.Deserializer': no matching editors or conversion strategy found
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:756)
    at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:236)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1350)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1187)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:558)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:518)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
    at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:365)
    ... 17 more

EDIT-1: After changing few things, now getting below error.

xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/util
           http://www.springframework.org/schema/util/spring-util.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <bean id="employeeProducer" class="com.example.kafka.producer.EmployeeProducer" />

    <util:map id="utilMap" map-class="java.util.HashMap">
        <entry key="bootstrap.servers" value="localhost"/>
        <entry key="auto.offset.reset" value="latest"/>
        <entry key="group.id" value="group1" />
        <entry key="client.id" value="my-client-id" />
        <entry key="max.poll.records" value="1"/>
        <entry key="value.class.name" value="com.example.kafka.model.Employee" />
    </util:map>

    <bean id="keyDeserializer" class="org.apache.kafka.common.serialization.StringDeserializer" />

    <bean id="cf" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg index="0" ref="utilMap" />
        <constructor-arg index="1" ref="keyDeserializer" />
        <constructor-arg index="2" ref="valueDeserializer" />
    </bean>

    <bean id="valueDeserializer" class="org.springframework.kafka.support.serializer.ErrorHandlingDeserializer">
        <constructor-arg name="delegate">
            <bean class="com.example.kafka.serdes.AppJsonDeserializer"/>
        </constructor-arg>
    </bean>

    <bean id="cp" class="org.springframework.kafka.listener.ContainerProperties">
        <constructor-arg name="topics" value="t-employee"/>
        <property name="groupId" value="group1"/>
        <property name="messageListener" ref="myListener"/>
    </bean>

    <bean id="containerListener" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
        <constructor-arg index="0" ref="cf"/>
        <constructor-arg index="1" ref="cp" />
    </bean>


    <bean id="myListener" class="com.example.kafka.listener.MyMessageListener" />

    <bean id="employeeConsumer" class="com.example.kafka.consumer.EmployeeKafkaConsumer2" >
        <constructor-arg name="topic" value="t-employee" />
    </bean>
</beans>

Error:

10:28:18.340 [main] WARN  o.s.c.s.ClassPathXmlApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'containerListener'
Exception in thread "main" org.springframework.context.ApplicationContextException: Failed to start bean 'containerListener'
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:186)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:363)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:160)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:128)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:968)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:618)
    at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:144)
    at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:85)
    at com.example.kafka.MyMainApp.main(MyMainApp.java:15)
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Class (java.lang.String and java.lang.Class are in module java.base of loader 'bootstrap')
    at com.example.kafka.serdes.AppJsonDeserializer.configure(AppJsonDeserializer.java:35)
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.configure(ErrorHandlingDeserializer.java:137)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.lambda$valueDeserializerSupplier$9(DefaultKafkaConsumerFactory.java:194)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer.<init>(DefaultKafkaConsumerFactory.java:479)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:461)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:438)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:415)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:382)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:359)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:890)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:393)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:510)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:183)
    ... 9 more

Solution

  • You bean definition is wrong:

    <bean id="valueDeserializer" class="org.springframework.kafka.support.serializer.ErrorHandlingDeserializer">
        <constructor-arg name="delegate" value="com.example.kafka.serdes.AppJsonDeserializer" />
    </bean>
    

    That delegate does not accept a String, but a Deserializer instance. So, it has to be declared like this:

    <bean id="valueDeserializer" class="org.springframework.kafka.support.serializer.ErrorHandlingDeserializer">
        <constructor-arg name="delegate">
           <bean class="com.example.kafka.serdes.AppJsonDeserializer"/>
        </constructor-arg>
    </bean>
    

    Consider to migrate to Java & Annotation configuration to avoid extra burden with XML style.