I am trying to compute an aggregation on Kafka's SessionWindows using a class MyVeryCustomAggregator
, which stores aggregated information as attributes and provides the methods aggregate
and result
for processing new messages and retrieving the final result respectively.
My main difficulty lies in implementing the Merger
interface required by the aggregate
method, i.e. MyVeryCustomAggregator::mergeWith
. The aggregation I am computing is highly dependent on the order of messages and their timestamps. I am therefore unable to simply add the two aggregators as in the Confluent documentation, which solely states:
When windowing is based on sessions, you must additionally provide a “session merger” aggregator (e.g., mergedAggValue = leftAggValue + rightAggValue).
When using session windows: the session merger is called whenever two sessions are being merged.
Any further information on the following questions would be highly appreciated
agg1
and agg2
)? Are these e.g. two consecutively computed aggregations or can a sequence of aggregations be merged in an arbitrary order by Kafka?Example implementation to perhaps more easily refer to in answers.
builder.stream("INPUT_TOPIC", Consumed.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomMessage.class)))
.groupByKey(Grouped.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomMessage.class)))
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(15)))
.aggregate(
MyVeryCustomAggregator::new,
(key, newValue, agg) -> agg.aggregate(newValue),
(aggKey, agg1, agg2) -> agg1.mergeWith(agg2),
Named.as("MyVeryCustomAggregator"),
Materialized.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomAggregator.class))
)
.toStream()
.map((windowedKey, agg) -> KeyValue.pair(windowedKey.key(), agg.result()))
.to("OUTPUT_TOPIC", Produced.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomOutput.class)));
Each incoming record results in a new session (a SessionWindow
) with a start and end time based on that record's timestamp. Then Kafka Streams will search for all session windows for the given key by timestamp range of the current record timestamp - window.inactivityGap to current record timestamp + window.inactivityGap. Practically speaking, it will probably be one session window found - they get merged because as long as records arrive within the inactivity gap the session will continue to grow in size.
Since the previous sessions are fetched by time you are guaranteed that the merger is applied in order of arrival, and the merged aggregate is computed in time order, it's not arbitrary.
The two parameters agg1
and agg2
are the current aggregate and the next aggregate to combine/aggregate into the overall aggregate for the session window, going back to the previous sentence since aggregate/merging is applied in time order they are indeed consecutive and not arbitrary.
HTH