Search code examples
javaspringapache-kafkaspring-kafka

MessageConversionException during serialization in spring-kafka?


To show what I mean I make simple project.

Dependencies:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

This is all kafka configuration in code:

@Configuration
public class KafkaSerializationConfig implements SmartInitializingSingleton {

    private final KafkaProperties kafkaProperties;

    public KafkaSerializationConfig(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Override
    public void afterSingletonsInstantiated() {
        AdminClient client = AdminClient.create(kafkaProperties.buildAdminProperties());

        List<NewTopic> newTopics = new ArrayList<>();
        newTopics.add(new NewTopic("demo", 2, (short) 2));
        client.createTopics(newTopics);

        client.close();
    }

    private static ObjectMapper mapper = new ObjectMapper()
        .registerModules(new Jdk8Module(), new JavaTimeModule());

    public static class KafkaSerializer extends JsonSerializer<Object> {
        public KafkaSerializer() {
            super(mapper);
        }
    }

    public static class KafkaDeserializer extends JsonDeserializer<Object> {
        public KafkaDeserializer() {
            super(mapper);
        }
    }

}

application.yml file:

    spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: sample-cutter
      auto-offset-reset: earliest
      properties.spring.json.trusted.packages: "*"
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: com.example.springkafkasimpledemo.config.KafkaSerializationConfig.KafkaDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: com.example.springkafkasimpledemo.config.KafkaSerializationConfig.KafkaSerializer

So this app can be up and its works. But let's try to use it. Let's image we have two services: the server and the client. So we have two classes with the same fields:

DTO for the client:

public class GettingUser {

    private String firstName;
    private String lastName;

    public GettingUser() {
    }

    public GettingUser(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}

DTO for the server:

public class SendingUser {

    private String firstName;
    private String lastName;

    public SendingUser() {
    }

    public SendingUser(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}

And eventually let's try to use it:

@RestController
public class SpringSerializationDemoController {

    private final KafkaTemplate<Long, SendingUser> template;
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public SpringSerializationDemoController(KafkaTemplate<Long, SendingUser> template) {
        this.template = template;
    }

    @GetMapping("/start-demo")
    public String startDemo() {
        SendingUser user = new SendingUser("John", "Smith");
        template.send("demo", user);
        return "OK";
    }

    @KafkaListener(topics = "demo")
    public void consumeCutSample(GettingUser user) {
        logger.info("Got user: {}", user);
    }
}

I can see exception which show my app can't cast SendingUser to GettingUser. Exception:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.springkafkasimpledemo.controller.SpringSerializationDemoController.consumeCutSample(com.example.springkafkasimpledemo.domain.GettingUser)]
Bean [com.example.springkafkasimpledemo.controller.SpringSerializationDemoController@53601a9c]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2037) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2025) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1924) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1851) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1748) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1472) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1135) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1992) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1974) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1911) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    ... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:910) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    ... 13 common frames omitted

But why does it do it? My object mapper doesn't use types. I can see it in my console:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic demo
{"firstName":"John","lastName":"Smith"}

Can't understand why spring-kafka tries to cast SendingUser to GettingUser.


Solution

  • If you are using the Spring JSON (de)serializer (both sides), you need to configure the type mapping - see https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#json-serde

    Map the source class to a token on the sending side and map the token to the required class on the receiving side.

    Or you can disable the use of headers in the deserializer and configure a default type. See setUseTypeHeaders().

    EDIT

    For more sophisticated types, e.g. generics, you should configure the deserializer to call a method that returns a JavaType.

    e.g. for a List<Foo>:

    public static JavaType returnType(String topic, byte[] data, Headers headers) {
        return TypeFactory.defaultInstance()
                .constructCollectionLikeType(List.class, Foo.class);
    }
    
    spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.Application.returnType
    

    Use a TypeReference or constructParametricType for your Event<SourceFile>. See Spring Kafka Consumer consumed message as LinkedHashMap hence automatically converting BigDecimal to double