Search code examples
apache-kafkaapache-kafka-streams

Kafka Streams : Get count of events in time window


I have data stream as <string,string> events. I want to get count of events for 10 minutes time windows and output to another topic. Following is my code

StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream("events")
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMillis(10000)))
            .count()
            .toStream()
            .to("output");

but i get error

 ClassCastException while producing data to topic output. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.kstream.Windowed / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).

Solution

  • The result of windowedBy plus count() is a key-value pair with type <Windowed<String>, Long> and thus you need to set a different serde in to() via Produced parameter. By default, the serdes from the config will be used that it seems you set to StringSerde/StringSerde and those obviously don't match the output topic key/value types.

    Kafka Streams ships with built-in serdes for windowed types that you can get via Serdes factory class.