Iam new to kafka streams and I have been working with ktable . Iam trying to detect dulicate events in kafka streams using the ktable and remove the duplicate events. As a inital step I tried building a simple Ktable but it throws error.
The code for creating ktable is shown below.
AvroDeserializer avroDeserializer = new AvroDeserializer();
AvroSerializer avroSerializer = new AvroSerializer();
Serde<GenericRecord> keySerde = Serdes.serdeFrom(avroSerializer, avroDeserializer);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, GenericRecord> records = builder.stream(topics, Consumed.with(Serdes.String(), keySerde));
records.map((key, value) -> KeyValue.pair(String.valueOf(value.get("eventId")), value))
.map((windowedUserId, count) -> new KeyValue<>(windowedUserId, count))
.groupByKey()
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream()
.to("topic",Produced.with(Serdes.String(),Serdes.Long()) );
But everytime I run this this throws following error.
2021-06-22 20:29:33.247 ERROR 14680 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [test-app-36b983af-8ed0-4a6b-be17-7a9ada214774] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic test-app-store-repartition. A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.avro.generic.GenericData$Record). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
Can someone help me understand the problem.
The error comes from .groupByKey()
while producing to the -repartition
topic, and it is using the default Serdes in your StreamsConfig
You would need to specify the Serdes for it using .groupByKey(Grouped.with(keySerde, valueSerde))
, or modify your defaults in the StreamsConfig like the error says