Hi I am trying out the latest Spring cloud stream framework for Kafka. However, for String and Double its working fine but when I try to send a Java POJO class, it throws the below exception.
I have tried various configuration for serialization and deserialization but nothing seems to be working. I am able to produce the message from the Supplier, as json but the consumers are not able to process it due to the error.
Any suggestions for the issue would be appreciated. Thanks
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@25625f0f]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class com.msg.bt.Grade ([B is in module java.base of loader 'bootstrap'; com.msg.bt.Grade is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @475530b9), failedMessage=GenericMessage [payload=byte[72], headers={kafka_offset=804, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@24ee2e4c, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=random-topic-2, kafka_receivedTimestamp=1624778145945, kafka_groupId=mypublish-reader-group, target-protocol=kafka}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2371) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2339) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2300) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2214) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2139) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2021) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1703) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@25625f0f]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class com.msg.bt.Grade ([B is in module java.base of loader 'bootstrap'; com.msg.bt.Grade is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @475530b9)
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.8.jar!/:5.3.8]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.8.jar!/:5.3.8]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.8.jar!/:5.3.8]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.8.jar!/:5.3.8]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:398) ~[spring-integration-kafka-5.5.1.jar!/:5.5.1]
Below is the POM.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.sample</groupId>
<artifactId>message</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>message</name>
<description>Spring messaging sample</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2020.0.3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Below is the application
@Bean
public Supplier<Grade> random() {
return () -> {
System.out.println("=========In Supplier --------------------");
return new Grade(UUID.randomUUID().toString(),Math.random()*100);
};
}
@Bean
public Function<Grade, String> publish() {
System.out.println("@@@@@@@@@@@@@@In Function user--------------------");
return user -> "User:: " + user.toString();
}
@Bean
public Consumer<String> log() {
System.out.println("++++++++++++In log --------------------");
return s -> {
System.out.println("Received>> " + s);
};
}
application.yml
spring:
application:
name: kafka-messaging
json:
value:
default:
type: com.msg.Grade
trusted:
packages: com.msg
cloud:
function:
definition: random;publish;log
stream:
bindings:
random-out-0:
destination: random-topic-2
publish-in-0:
destination: random-topic-2
group: mypublish-reader-group
publish-out-0:
destination: message-topic-2
log-in-0:
destination: message-topic-2
group: message-reader-group
kafka:
binder:
brokers: localhost:9092
Finally I found out the issue. Feel extremely stupid because the issue was that there was no default no-arg constructor in the Model class. After adding the no-arg constructor, all works as expected.