Search code examples
apache-kafka-streamsksqldb

How does Kafka Streams windowing work?


I am having a hard time comprehending how Windowing works in Kafka Streams. The results don't seem to align with what I have read and understood so far.

I have created a KSQL Stream with a backing topic. one of the 'columns' in the KSQL SELECT statement has been designated as the TIMESTAMP for the topic.

CREATE STREAM my_stream WITH (KAFKA_topic='my-stream-topic', VALUE_FORMAT='json', TIMESTAMP='recorded_timestamp') AS select <select list> PARTITION BY PARTITION_KEY;

Records in my-stream-topic are grouped by the key (PARTITION_KEY) and windowed with a hopping window

val dataWindowed: TimeWindowedKStream[String, TopicValue] = builder.stream('my-stream-topic', consumed) 
    .groupByKey(Serialized.`with`(Serdes.String(), valueSerde))
    .windowedBy(TimeWindows.`of`(TimeUnit.MINUTES.toMillis(5)).advanceBy(TimeUnit.MINUTES.toMillis(1)).until(TimeUnit.MINUTES.toMillis(5)))

Records are aggregated via

val dataAgg: KTable[Windowed[String], ValueStats] = dataWindowed
    .aggregate(
      new Initializer[TopicStats] {<code omitted>}},
      new Aggregator[String, TopicValue, TopicStats] {<code omitted>}},
      Materialized.`as`[String, TopicStats, WindowStore[Bytes, Array[Byte]]]("time-windowed-aggregated-stream-store")
        .withValueSerde(new JSONSerde[TopicStats])
    )

  val topicStats: KStream[String, TopicValueStats] = dataAgg
    .toStream()
    .map( <code omitted for brevity>)

I then print to console via

dataAgg.print()
topicStats.print()

The first window that's in the group translates to 7:00 - 7:05

When I examine the records in my-stream-topic via a console consumer I see that there are 2 records that should fall within the above window. However, only 1 of them is picked up by the aggregator.

I thought that the dataAgg windowed KTable would contain 1 record for the grouped key but the aggregate would have used the 2 records to compute the aggregate. The aggregate value printed is incorrect.

What am I missing?


Solution

  • KSQL can set record timestamps on write, however you need to specify the timestamp when creating an input stream, not when defining the output stream. Ie, the timestamp specified for the input stream will be used to set the record metadata field on write.

    This behavior is rather unintuitive and I opened a ticket for this issue: https://github.com/confluentinc/ksql/issues/1367

    Thus, you need to specify the with(TIMESTAMP='recorded_timestamp') clause when creating the input stream for the query you showed in the question. If this is not possible, because your query needs to operate on a different timestamp, you need to specify a second query that copies the data into a new topic.

    CREATE STREAM my_stream_with_ts
        WITH (KAFKA_topic='my-stream-topic-with-ts')
    AS select * from my_stream PARTITION BY PARTITION_KEY;
    

    As an alternative, you can set a custom timestamp extractor for you Kafka Streams application to extract the timestamp from the payload.