I have a Spring Cloud Stream application where I'm trying to leverage Functional Interfaces. I got a proof of concept working with just Strings, but now that I'm trying to deal with Avro-generated messages, I cannot get the deserialization to work. I've looked through similar questions, but they're either not working with Avro or they got Avro to work and my attempts to use their configuration were unsuccessful.
Below is the code and the configuration. I've tried a variety of different yml entries without any luck, going so far as trying to create my own deserializer.
public class StreamFunctions {
@Bean
public Function<KStream<String, MyAvroInputClass>, KStream<String, MyAvroOutputClass>> myConsumer() {
return input -> {
log.error("Received event");
// ignore the message
log.error("Ignoring event");
return input.map((k, v) -> new KeyValue<>(null, null));
};
}
}
Application.yml
---
spring:
application:
name: functional-streams
main:
banner-mode: off
cloud:
function:
definition: myConsumer
stream:
default-binder: kafka
bindings:
myConsumer-in-0:
destination: input-topic
content-type: application/*+avro
consumer:
use-native-decoding: true
auto-startup: true
myConsumer-out-0:
content-type: application/*+avro
destination: output-topic
producer:
use-native-encoding: true
kafka:
streams:
binder:
applicationId: ${info.app.name}
configuration:
commit.interval.ms: 100
schema.registry.url: http://localhost:9091
binder:
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:9092
consumer-properties:
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
schema.registry.url: http://localhost:9092
specific.avro.reader: true
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
configuration:
value:
deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring:
deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
Gradle file
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-aop")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-log4j2")
implementation("org.apache.avro:avro:1.11.0")
implementation("io.confluent:kafka-avro-serializer:7.1.1")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.2.4")
// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka-streams
implementation ("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.2.4")
// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-schema
testImplementation("org.springframework.cloud:spring-cloud-stream-schema:2.2.1.RELEASE")
// implementation("org.springframework.cloud:spring-cloud-schema-registry-client")// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-function-context
implementation("org.springframework.cloud:spring-cloud-function-context:3.2.6")
implementation("com.fasterxml.jackson.core:jackson-core:2.13.3")
implementation("com.fasterxml.jackson.core:jackson-annotations")
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml")
implementation("com.fasterxml.jackson.core:jackson-databind")
implementation("org.springdoc:springdoc-openapi-ui:1.6.11")
Error Result:
2022-09-06 15:09:39,987 [ERROR] [core-cls-sync-advertising-df1651cc-0ab8-47b5-b7b9-fd307b6a6d8e-StreamThread-1] LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_0, topic: inbound-topic, partition: 0, offset: 3 {}
org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 6, 0, 0, 0, 0, 2, 8, 74, 111, 104, 110, 2, 6, 68, 111, 101, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]] from topic [inbound-topic]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:588) ~[spring-kafka-2.8.8.jar:2.8.8]
I'm assuming it's something wrong with my yml file, but I can't figure out what I've got wrong
I think it's related to some configuration issues. Please compare your configuration with this sample application: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kafka-streams-samples/kafka-streams-avro
Take a look at the configuration here.