Search code examples
apache-kafkaapache-kafka-streams

Behavior of aggregation on Kafka's SessionWindows


I am trying to compute an aggregation on Kafka's SessionWindows using a class MyVeryCustomAggregator, which stores aggregated information as attributes and provides the methods aggregate and result for processing new messages and retrieving the final result respectively.

My main difficulty lies in implementing the Merger interface required by the aggregate method, i.e. MyVeryCustomAggregator::mergeWith. The aggregation I am computing is highly dependent on the order of messages and their timestamps. I am therefore unable to simply add the two aggregators as in the Confluent documentation, which solely states:

When windowing is based on sessions, you must additionally provide a “session merger” aggregator (e.g., mergedAggValue = leftAggValue + rightAggValue).

When using session windows: the session merger is called whenever two sessions are being merged.

Any further information on the following questions would be highly appreciated

  1. Why do aggregations need to be merged at all? As I am using no grace period, I would expect the aggregation to be computed only once per session being closed.
    • Even if out-of-order handling is necessary, I would expect the previous aggregator to be loaded and used for further processing.
  2. What arguments are passed to the Merger (agg1 and agg2)? Are these e.g. two consecutively computed aggregations or can a sequence of aggregations be merged in an arbitrary order by Kafka?

Example implementation to perhaps more easily refer to in answers.

builder.stream("INPUT_TOPIC", Consumed.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomMessage.class)))
    .groupByKey(Grouped.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomMessage.class)))
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(15)))
    .aggregate(
        MyVeryCustomAggregator::new,
        (key, newValue, agg) -> agg.aggregate(newValue),
        (aggKey, agg1, agg2) -> agg1.mergeWith(agg2),
        Named.as("MyVeryCustomAggregator"),
        Materialized.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomAggregator.class))
    )
    .toStream()
    .map((windowedKey, agg) -> KeyValue.pair(windowedKey.key(), agg.result()))
    .to("OUTPUT_TOPIC", Produced.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomOutput.class)));

Solution

  • Each incoming record results in a new session (a SessionWindow) with a start and end time based on that record's timestamp. Then Kafka Streams will search for all session windows for the given key by timestamp range of the current record timestamp - window.inactivityGap to current record timestamp + window.inactivityGap. Practically speaking, it will probably be one session window found - they get merged because as long as records arrive within the inactivity gap the session will continue to grow in size.

    Since the previous sessions are fetched by time you are guaranteed that the merger is applied in order of arrival, and the merged aggregate is computed in time order, it's not arbitrary.

    The two parameters agg1 and agg2 are the current aggregate and the next aggregate to combine/aggregate into the overall aggregate for the session window, going back to the previous sentence since aggregate/merging is applied in time order they are indeed consecutive and not arbitrary.

    HTH