Search code examples
javaspring-bootapache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring cloud stream doesn't deserialise kafka message automatically for specified configuration


I use spring-cloud-stream-binder-kafka and spring-cloud-stream to configure kafka streaming in functional way. My cloud dependencies are from

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2023.0.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

the main dependencies are

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

My yml configuration:

kafka-conf:
#  server: localhost:9101
  server: localhost:9092
  group: test-streams
  consume-topic-report-details: report_details_topic
  report-details-dlq: report_details_dlq
  produce-topic-report-details: report_details_topic_redirect
  schema: http://localhost:8081

spring:
  application:
    name: ${kafka-conf.group}-streaming
  cloud:
    function:
      definition: reportDetails
    stream:
      bindings:
        reportDetails-in-0:
          contentType: application/*+avro
          destination: ${kafka-conf.consume-topic-report-details}
          group: ${kafka-conf.group}-streaming
        reportDetails-out-0:
          contentType: application/*+avro
          destination: ${kafka-conf.produce-topic-report-details}
      kafka:
        streams:
          binder:
            deserialization-exception-handler: sendToDlq
            configuration:
              commit.interval.ms: 100
              default:
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
          bindings:
            reportDetails-in-0:
              consumer:
                dlqName: ${kafka-conf.report-details-dlq}
        binder:
          brokers: ${kafka-conf.server}
          schemaRegistryClient:
            endpoint: ${kafka-conf.schema}

The issue details

The consumed messages Serialised with io.confluent.kafka.serializers.KafkaAvroSerializer. I expect that my service automatically use io.confluent.kafka.serializers.KafkaAvroDeserializer for every message in stream, but it seems something is ignored in my yml configuration.

and as result my steam fails

    @Bean
    Function<ReportDetails, ReportDetails> reportDetails() {
        return data -> {
            log.info("input reportDetails: {}", data);
            return data;
        };
    }

with exception

Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.vl.model.avro.ReportDetails ([B is in module java.base of loader 'bootstrap'; com.vl.model.avro.ReportDetails is in unnamed module of loader 'app')
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunctionAndEnrichResultIfNecessary(SimpleFunctionRegistry.java:958)

On other hand I tried deserialise it directly (it works).

    @Bean
    Function<byte[], byte[]> filterAbsence() {
        return dto -> {
            SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient("http://schema-server:8081", 5);
            try (KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(schemaRegistryClient)) {
                AbsenceDto obj = deserializer.deserialize("report_details_topic", dto);
                log.info("received message: {}", obj);
            }
            log.info("received message: {}", dto);
            return dto;
        };
    }

I suspect that my DQL configured incorrectly also. When my consumer fails I expect that messages will be redirected to DLQ, but one is empty.

Questions:

  1. How to fix my yml configuration to make deserialising working automatically?
  2. How to fix my DQL configuration or get confirmation if one is correctly configured?

Solution

  • From the dependencies you show above and the YAML configuration, it is implied that you are using Kafka Streams. However, the code you shared does not use the Kafka Streams binder in Spring Cloud Stream. It uses the regular Kafka binder built on the Kafka client libraries (regular producer/consumer). First of all, we need to make sure that this assumption is true. For example, this code does not have anything specific to Kafka Streams but a regular Java function that will be intercepted by the regular Kafka binder (spring-cloud-stream-binder-kafka in your dependency).

    @Bean
    Function<ReportDetails, ReportDetails> reportDetails() {
            return data -> {
                log.info("input reportDetails: {}", data);
                return data;
            };
    }
    

    If that assumption is true, you can remove your configuration under spring.cloud.stream.kafka.streams.binder and use spring.cloud.stream.kafka.binder. The remainder of this answer is under this assumption.

    Since you are using Avro, you must provide an appropriate de/serializer -- either a message converter or a Kafka de/serializer. If you'd like to use the first option by providing a message converter, please take a look at this issue for some insights.

    If you go with the second option, you must enable native de/serialization on Kafka by turning the nativeEncoding for the producer and nativeDecoding for the consumer from Spring Cloud Stream. In this case, you also need to provide a proper Avro Kafka de/serializer. You can use the one from Confluent for that - kafka-avro-serializer. Once you add this dependency, you need to make the corresponding configuration changes. The following is something that I can infer from your original configuration.

    spring:
      application:
        name: ${kafka-conf.group}-streaming
      cloud:
        function:
          definition: reportDetails
        stream:
          bindings:
            reportDetails-in-0:
              destination: ${kafka-conf.consume-topic-report-details}
              group: ${kafka-conf.group}-streaming
              consumer: 
                use-native-decoding: true
            reportDetails-out-0:
              destination: ${kafka-conf.produce-topic-report-details}
              producer:
                use-native-encoding: true
          kafka:
            bindings:
                reportDetails-in-0:
                  consumer:
                    configuration:
                      schema.registry.url: http://localhost:8081
                      value.dserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
                reportDetails-out-0:
                  producer:
                    configuration:
                      schema.registry.url: http://localhost:8081
                      value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            binder:
              brokers: ${kafka-conf.server}
    

    Pay attention to the way the YAML configuration hierarchy is changed around. First of all, the Kafka Streams binder properties are gone now. Then we set nativeEncoding/decoding on the core bindings. Note that there is no longer need to set contentType since we are using native de/serialization on Kafka. Then, specifically, on kafka bindings (..kafka.bindings..), we set Kafka specific consumer/producer configuration (such as schema.registry.url, value.deserializer, value.serializer, etc.).

    This should work. If not, please feel free to provide a minimal standalone sample, and we can triage it further.

    Here is a sample that uses both Kafka and Kafka Streams binders with Avro that you may want to use as a reference: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kafka-streams-samples/kafka-streams-avro. See the configuration in that app in particular.