Search code examples
apache-kafkaapache-kafka-streamsspring-kafka

How to aggregate data hourly?


Whenever a user favorites some content on our site we collect the events and what we were planning to do is to hourly commit the aggregated favorites of a content and update the total favorite count in the DB.

We were evaluating Kafka Streams. Followed the word count example. Our topology is simple, produce to a topic A and read and commit aggregated data to another topic B. Then consume events from Topic B every hour and commit in the DB.

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
   public StreamsConfig kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "favorite-streams");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
    return new StreamsConfig(props);
}

@Bean
public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {
    StreamsBuilder builder = streamBuilder();
    KStream<String, String> source = builder.stream(topic);
    source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
            .groupBy((key, value) -> value)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store")).toStream()
            .to(topic + "-grouped", Produced.with(Serdes.String(), Serdes.Long()));

    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, kStreamsConfigs());
    streams.start();
    return source;
}

@Bean
public StreamsBuilder streamBuilder() {
    return new StreamsBuilder();
}

However when I consume this Topic B it gives me aggregated data from the beginning. My question is that can we have some provision wherein I can consume the previous hours grouped data and then commit to DB and then Kakfa forgets about the previous hours data and gives new data each hour rather than cumulative sum. Is the design topology correct or can we do something better?


Solution

  • If you want to get one aggregation result per hour, you can use a windowed aggregation with a window size of 1 hour.

    stream.groupBy(...)
          .windowedBy(TimeWindow.of(1 *3600 * 1000))
          .count(...)
    

    Check the docs for more details: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing

    The output type is Windowed<String> for the key (not String). You need to provide a custom Window<String> Serde, or convert the key type. Consult SessionWindowsExample.