Search code examples
apache-kafkaavroapache-kafka-streamsspring-cloud-stream

Getting Kafka Streams Class Cast Exception on the same Class while applying map function


UserRecord.java (autogenerated by Maven Avro plugin)

UserRecord extends SpecificRecordBase implement SpecificRecord

UserRecordSerde.java

UserRecordSerde extends SpecificAvroSerde

application.yml

spring.cloud.stream.bindings.input.destination: userTopic
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde: LongSerdespring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: LongSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: SpecificAvroSerde

Class - StreamListener - raw stream comes with null key and UserRecord object in avro

@StreamListener
        public KStream<Long, ArrayList<UserRecord>> handleUserRecords (@Input KStream<?, UserRecord> userRecordStream) { <br/>
        Map<String, Object> serdeConfig = new HashMap();
        serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        serdeConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); <br/>
        Serde<ArrayList<UserRecord>> userRecordListSerde = new SpecificAvroSerde();
        userRecordListSerde.configure(serdeConfig, false); <br/>
        return userRecordStream
            .map((key, value) -> new KeyValue(value.getUserID, value)
            .groupByKey(Serialized.with(Serdes.Long(), userRecordSerde))
            .aggregate(ArrayList::new, Long key, UserRecord value, ArrayList agg ->
            {
               agg.add(value);
               return agg;
            }, userRecordListSerde)
        .toStream();
    }

exception

java.lang.ClassCastException: com.example.UserRecord cannot be cast to com.example.UserRecord
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)

Solution

  • why don't you just remove this from the configuration altogether? spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde and simply use the SpecificAvroSerde directly through the default configuration. spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.