Search code examples
spring-bootapache-kafkaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Using Spring Cloud Stream Kafka Streams with Avro input/output with nativeEncoding/decoding=false


We're testing the use of Kafka Streams via Spring Cloud Stream function support with Avro input/output records, but setting nativeEncoding=false and nativeDecoding=false in order to use a custom MessageConverter where we do the Avro conversion.

The default serdes are StringSerde for keys and ByteArraySerde for values.

Everything is ok when we only use a KStream to KStream function, for example:

    @Bean
    public Function<KStream<String, DataRecordAvro>, KStream<String, DataRecordAvro>> wordsCount() {
      return input -> input
          .flatMapValues(value -> Arrays.asList(value.getName().toString().toLowerCase().split("\\W+")))
          .map((key, value) -> new KeyValue<>(value, value))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
          .windowedBy(TimeWindows.of(Duration.ofSeconds(5)).grace(Duration.ofMillis(0)))
          .count()
          .toStream()
          .map((key, value) -> new KeyValue<>(key.key(), new DataRecordAvro(key.key(), value)));
    }

but when we try a little bit more complex example involving an input KTable like this:

    @Bean
    public BiFunction<KStream<String, DataRecordAvro>, KTable<String, DataRecordAvro>, KStream<String, DataRecordAvro>> userClicksRegionKTableAvro() {
      return (userClicksStream, usersRegionKTable) -> userClicksStream
          .leftJoin(usersRegionKTable,
              (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region.getName().toString(), clicks.getCount()))
          .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
          .reduce(Long::sum)
          .mapValues((key, value) -> new DataRecordAvro(key, value))
          .toStream();
    }

(The DataRecordAvro class only have two members: CharSequence name; Long count;)

When received the first record this exception is thrown:

ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.String, and value: com.xxxx.kstreams.fixtures.avro.DataRecordAvro.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.

The processor where the exception is thrown seems to be:

KSTREAM-LEFTJOIN-0000000011:
    states:     [user-regions-avro-STATE-STORE-0000000008]

We have no idea why it doesn't work in this case. Maybe the leftJoin operation persists information to an internal topic and there the useNativeEncoding/Decoding=false are not taken into account? But why the kstream->kstream example above does work? We thought the Avro conversion was only done at the start and end of the Topology, why this casting exception while using leftJoin?

Here is another example that works ok (without input Avro records, leaving consumer useNativeDecoding as default true):

    @Bean
    public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, DataRecordAvro>> userClicksRegionKTable() {
      return (userClicksStream, usersRegionKTable) -> userClicksStream
          .leftJoin(usersRegionKTable,
              (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks))
          .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
          .reduce(Long::sum)
          .mapValues((key, value) -> new DataRecordAvro(key, value))
          .toStream();
    }

Please help!


Solution

  • For Kafka Streams binder in Spring Cloud Stream, we recommend using native decoding/encoding with Serdes unless you have strong reasoning for relying on the message conversion approach. What is the use case that forces you to go with message converters here? In practice, using message converters for serialization purposes in Kafka Streams applications in Spring Cloud Stream adds an extra layer in your topology and makes it deeper, thus the recommendation to use native decoding/encoding.

    As you noted, for KTable, the binder always uses native decoding - at the moment, it is not possible to use message converters there. When you turn off useNativeDecoding on the KTable binding, the binder ignores it and simply uses the default byte serde. I suggest going with the default on the KTable binding and then adding the following bean in your application configuration.

    @Bean
    public Serde< DataRecordAvro> dataRecordAvroSerde() {
       // return Serde
    }
    

    This way the binder will detect this bean and realize that the Serde type matches with the type from the function signature and then use it on those inputs.

    If you have further issues on this app, feel free to share an MCRE. We can take further look then.