Search code examples
apache-kafka-streams

More than one tumbling window


I am trying to understand whether I can achieve the following behavior with Kafka Streams, with grace configuration or in any other way:

  • Tumbling window of 1 min
  • Time is taken from a field in the message, lets call it messageTime field
  • Messages are sent from multiple sources and should be aggregated for each window
  • Some messages might be occasionally delayed so an option to process a message that arrived up to 2 minutes late is needed, but it should go in the correct window based on the value of messageTime field.
  • At 10:02:15 I want to have 3 windows that can receive data: 10:00-10:01, 10:01-10:02, 10:02-10:03
  • If message with messageTime value 10:00:00 arrives at 10:02:15 I want it to go into 10:00-10:01 window
  • If data with messageTime value 10:01:00 arrives at 10:02:15 I want it to go into 10:01-10:02 window
  • If data with messageTime value 10:02:00 arrives at 10:02:14 I want it to go into 10:02-10:03 window
  • If data with messageTime value 10:00:00 arrives at 10:03:01 I want it to be discarded (since beyond 1 min window plus 2 mins grace)
  • I want window 10:00-10:01 to be closed and flushed at 10:03 (1 min + 2 mins grace)
  • I want window 10:01-10:02 to be closed and flushed at 10:04 (1 min + 2 mins grace)
  • I want window 10:02-10:03 to be closed and flushed at 10:05 (1 min + 2 mins grace)

Solution

  • Yes, you would use TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1L), Duration.ofMinutes(2)) to specify the window.

    Time is taken from a field in the message, lets call it messageTime field

    You can write a custom TimestampeExtractor to use messageTime as timestamp, and pass it via StreamsConfig.

    If you only want to get a single final result when a window is closed, you can use either suppress() (on the KTable result) or windowedBy(...).emitStrategy(...).aggregate(...). (Note that window are only closed when "stream time" advances beyond their close time, so if input data stops flowing, time stops advancing and windows are kept open.)