Search code examples
javaavrospring-cloud-stream

Spring Cloud Stream Function won't use Avro Deserializer


I have a Spring Cloud Stream application where I'm trying to leverage Functional Interfaces. I got a proof of concept working with just Strings, but now that I'm trying to deal with Avro-generated messages, I cannot get the deserialization to work. I've looked through similar questions, but they're either not working with Avro or they got Avro to work and my attempts to use their configuration were unsuccessful.

Below is the code and the configuration. I've tried a variety of different yml entries without any luck, going so far as trying to create my own deserializer.

public class StreamFunctions {

    @Bean
    public Function<KStream<String, MyAvroInputClass>, KStream<String, MyAvroOutputClass>> myConsumer() {
        return input -> {
            log.error("Received event");
            // ignore the message
            log.error("Ignoring event");
            return input.map((k, v) -> new KeyValue<>(null, null));
        };
    }

}

Application.yml

---
spring:
  application:
    name: functional-streams
  main:
    banner-mode: off
  cloud:
    function:
      definition: myConsumer
    stream:
      default-binder: kafka
      bindings:
        myConsumer-in-0:
          destination: input-topic
          content-type: application/*+avro
          consumer:
            use-native-decoding: true
            auto-startup: true
        myConsumer-out-0:
          content-type: application/*+avro
          destination: output-topic
          producer:
            use-native-encoding: true
      kafka:
        streams:
          binder:
            applicationId: ${info.app.name}
            configuration:
              commit.interval.ms: 100
              schema.registry.url: http://localhost:9091
        binder:
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://localhost:9092
          consumer-properties:
            key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            schema.registry.url: http://localhost:9092
            specific.avro.reader: true
            spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
            spring.deserializer.value.delegate.class:  io.confluent.kafka.serializers.KafkaAvroDeserializer
          configuration:
            value:
              deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring:
              deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

Gradle file


    implementation("org.springframework.boot:spring-boot-starter")
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.springframework.boot:spring-boot-starter-aop")
    implementation("org.springframework.boot:spring-boot-starter-actuator")
    implementation("org.springframework.boot:spring-boot-starter-log4j2")

    implementation("org.apache.avro:avro:1.11.0")
    implementation("io.confluent:kafka-avro-serializer:7.1.1")

    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.2.4")
    // https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka-streams
    implementation ("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.2.4")
    // https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-schema
    testImplementation("org.springframework.cloud:spring-cloud-stream-schema:2.2.1.RELEASE")
//    implementation("org.springframework.cloud:spring-cloud-schema-registry-client")// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-function-context
    implementation("org.springframework.cloud:spring-cloud-function-context:3.2.6")

    implementation("com.fasterxml.jackson.core:jackson-core:2.13.3")
    implementation("com.fasterxml.jackson.core:jackson-annotations")
    implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml")
    implementation("com.fasterxml.jackson.core:jackson-databind")
    implementation("org.springdoc:springdoc-openapi-ui:1.6.11")

Error Result:

2022-09-06 15:09:39,987 [ERROR] [core-cls-sync-advertising-df1651cc-0ab8-47b5-b7b9-fd307b6a6d8e-StreamThread-1] LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_0, topic: inbound-topic, partition: 0, offset: 3 {}
org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 6, 0, 0, 0, 0, 2, 8, 74, 111, 104, 110, 2, 6, 68, 111, 101, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]] from topic [inbound-topic]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:588) ~[spring-kafka-2.8.8.jar:2.8.8]

I'm assuming it's something wrong with my yml file, but I can't figure out what I've got wrong


Solution

  • I think it's related to some configuration issues. Please compare your configuration with this sample application: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kafka-streams-samples/kafka-streams-avro

    Take a look at the configuration here.