Search code examples
javaapache-kafkaapache-kafka-streams

Kafka Streams Hopping Window generates thousands of windows


I have the following topology definition for exchange tickers with hopping window of 24 hours. What I want to achieve is to simply calculate a moving window for every second from last 24 hours. The answer to this is the hopping window, but the problem is that it generates window for every second for last and next 24 hours which ends with OOM error.

How to achieve it? The tumbling window is no option here, since it is static and deos not allow duplicates.

    kStreamBuilder.stream(config.input())
            .mapValues(new TickerConverter())
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(24)).advanceBy(Duration.ofSeconds(1)))
            .aggregate(TickerInit::new, new Aggregator(), buildStateStore(STORE))
            .mapValues(new TickerConverter())
            .toStream((key, value) -> key.key())
            .to(config.output().name());

Solution

  • This is unfortunately just how hopping windows work in Kafka streams. Think about it this way: If you want a new window every second over 24h this means you have 24*60*60 = 86400 windows open. Every time a new event comes in, you need to add it to every single window (and remove the events that dropped out of the window). Yes, 99.9% of the events are the same everywhere but there is no way for Kafka streams to know this.

    If you have 1Mb data per day this would already use 86.4Gb RAM. Also note that this is actually adding your value to 86400 state stores. The throughput should be abysmal. Ultimately you need to reduce the cardinality somehow (or buy enough ram to fit the state of your aggregations 86400 times).

    I ran into similar problems (trying to get hourly aggregates for data spanning years). However I controlled the API this was queried by and my solution at that time was to aggregate multiple levels (monthly, daily, hourly) and "sum together" the result on the fly. That came at the cost of a lot of complexity and a bit of latency as I needed to store the data somewhere and write logic to query it but at least it worked.

    Thinking about it again, a alternative could be to aggregate tumbling windows per second, use suppress to throw only one event at the end of the interval and then aggregate the tumbling window events with a hopping window. This won't help much with RAM, but save a lot of computation at least.

    You could also ask your PM whether what get this data down to a single second is worth the cost or whether 15 seconds would also suffice.