Search code examples
javaapache-kafkaapache-kafka-streams

Kafka Streams count throwing Exception


Iam new to kafka streams and I have been working with ktable . Iam trying to detect dulicate events in kafka streams using the ktable and remove the duplicate events. As a inital step I tried building a simple Ktable but it throws error.

The code for creating ktable is shown below.

        AvroDeserializer  avroDeserializer = new AvroDeserializer();

        AvroSerializer avroSerializer = new AvroSerializer();

        Serde<GenericRecord> keySerde =  Serdes.serdeFrom(avroSerializer, avroDeserializer);
       
        StreamsBuilder builder = new StreamsBuilder();


        KStream<String, GenericRecord> records = builder.stream(topics, Consumed.with(Serdes.String(), keySerde));

        records.map((key, value) -> KeyValue.pair(String.valueOf(value.get("eventId")), value))
                .map((windowedUserId, count) -> new KeyValue<>(windowedUserId, count))
                .groupByKey()
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("store")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.Long()))
                .toStream()
                .to("topic",Produced.with(Serdes.String(),Serdes.Long()) );

But everytime I run this this throws following error.

2021-06-22 20:29:33.247 ERROR 14680 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [test-app-36b983af-8ed0-4a6b-be17-7a9ada214774] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. 

org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic test-app-store-repartition. A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.avro.generic.GenericData$Record). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).

Can someone help me understand the problem.


Solution

  • The error comes from .groupByKey() while producing to the -repartition topic, and it is using the default Serdes in your StreamsConfig

    You would need to specify the Serdes for it using .groupByKey(Grouped.with(keySerde, valueSerde)), or modify your defaults in the StreamsConfig like the error says