Search code examples
apache-kafka-streams

Access Record Offset in Kafka Streams Aggregator


I have a simple windowing topology:

builder.stream("input-topic", Consumed.with(...))
    .groupByKey()
    .windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
    .aggregate(Frame::new,
            this::windowAggregator,
            ...
    )
    .transformValues(FrameTransformer::new)
    .toStream()
    .selectKey((key, value) -> value...)
    .to("output-topic", Produced.with(...));

I'd like to put the actual record offset of the beginning of the window into the Frame aggregating object.

How can I get access to the record offset from the windowAggregator (aggregate() handler) function?

I know that I can get access to the record offset in the FrameTransformer, but that doesn't help me creating accurate Frame objects describing my windows in terms of start and end offset.

I've heard there's a way of doing that by inserting another .transform() call before the groupByKey(), there I can access the offsets, but then I'd need to modify the schema of my event records to store there the offset information inside.

Is there a (simpler) way of achieving my intention?

Update

In fact, I was able to get the accurate window start and end offsets in the Frame objects in the following way

builder.stream("input-topic", Consumed.with(...))
    .transformValues(EventTransformer::new)
    .groupByKey()
    .windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
    .aggregate(Frame::new,
            this::windowAggregator,
            ...
    )
    .toStream()
    .selectKey((key, value) -> value...)
    .to("output-topic", Produced.with(...));

But as mentioned above, at the expense of editing the schema of Event object.


Solution

  • How can I get access to the record offset from the windowAggregator (aggregate() handler) function?

    You can't. Your approach to use transformValues() before the aggregation (and to enrich the Event object is the right approach.

    There was a proposal to extend the API to allow accessing record metadata within aggregate() and other DSL operators, but it was never pushed over the finish line (cf https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams).