Search code examples
apache-kafkaapache-kafka-streams

Kafka stream sum data after windowed group


I have a warehouse application, in which I need to calculate total stock on hourly basis.
All item movement data are sent to kafka stream (addition / removal).
This means, I can get the hourly aggregated movement using windowed kafka stream, like this

        sourceStream
                .mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity())
                .groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
                .reduce(Long::sum, Materialized.with(stringSerde, longSerde)).toStream().to("hourly-movement");

But how can I get the total stock based on this aggregated result?
For example, with this data set, assuming the starting stock is zero:

  • 09:15 : +50 item
  • 09:20 : +10 item
  • 09:50 : +10 item
  • 10:35 : -40 item
  • 10:55 : -20 item

The aggregated stream result (by window) is this :

  • item@09:00/10:00 : 70
  • item@10:00/11:00 : -60

I need to create hourly chart in frontend, means I need this data set:

  • item@09:00/10:00 : 70 (initial + movement in one hour)
  • item@10:00/11:00 : 10 (item at 10:00 + movement in next hour, which is 70 - 60)

How can I get such data set? The original source stream is from stream-logistic-movement.


Solution

  • It seem you don't want to do a windowed aggregation, but an overall aggregation but emit the current result hourly.

    Thus, you should not use a windowBy() at all, but just a "regular" non-windowed aggregation. After the aggregation, you could use suppress() to emit the result on regular intervals: https://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/kstream/Suppressed.html#untilTimeLimit-java.time.Duration-org.apache.kafka.streams.kstream.Suppressed.BufferConfig-