I have data stream as <string,string> events. I want to get count of events for 10 minutes time windows and output to another topic. Following is my code
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream("events")
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(10000)))
.count()
.toStream()
.to("output");
but i get error
ClassCastException while producing data to topic output. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.kstream.Windowed / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
The result of windowedBy
plus count()
is a key-value pair with type <Windowed<String>, Long>
and thus you need to set a different serde in to()
via Produced
parameter. By default, the serdes from the config will be used that it seems you set to StringSerde/StringSerde
and those obviously don't match the output topic key/value types.
Kafka Streams ships with built-in serdes for windowed types that you can get via Serdes
factory class.