I have a warehouse application, in which I need to calculate total stock on hourly basis.
All item movement data are sent to kafka stream (addition / removal).
This means, I can get the hourly aggregated movement using windowed kafka stream, like this
sourceStream
.mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity())
.groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
.reduce(Long::sum, Materialized.with(stringSerde, longSerde)).toStream().to("hourly-movement");
But how can I get the total stock based on this aggregated result?
For example, with this data set, assuming the starting stock is zero:
The aggregated stream result (by window) is this :
I need to create hourly chart in frontend, means I need this data set:
How can I get such data set? The original source stream is from stream-logistic-movement
.
It seem you don't want to do a windowed aggregation, but an overall aggregation but emit the current result hourly.
Thus, you should not use a windowBy()
at all, but just a "regular" non-windowed aggregation. After the aggregation, you could use suppress()
to emit the result on regular intervals: https://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/kstream/Suppressed.html#untilTimeLimit-java.time.Duration-org.apache.kafka.streams.kstream.Suppressed.BufferConfig-