I have a simple windowing topology:
builder.stream("input-topic", Consumed.with(...))
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.transformValues(FrameTransformer::new)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
I'd like to put the actual record offset of the beginning of the window into the Frame aggregating object.
How can I get access to the record offset from the windowAggregator
(aggregate()
handler) function?
I know that I can get access to the record offset in the FrameTransformer
, but that doesn't help me creating accurate Frame
objects describing my windows in terms of start and end offset.
I've heard there's a way of doing that by inserting another .transform()
call before the groupByKey()
, there I can access the offsets, but then I'd need to modify the schema of my event records to store there the offset information inside.
Is there a (simpler) way of achieving my intention?
Update
In fact, I was able to get the accurate window start and end offsets in the Frame
objects in the following way
builder.stream("input-topic", Consumed.with(...))
.transformValues(EventTransformer::new)
.groupByKey()
.windowedBy(TimeWindows.of(windowSize).advanceBy(windowAdvance).grace(windowGrace))
.aggregate(Frame::new,
this::windowAggregator,
...
)
.toStream()
.selectKey((key, value) -> value...)
.to("output-topic", Produced.with(...));
But as mentioned above, at the expense of editing the schema of Event
object.
How can I get access to the record offset from the windowAggregator (aggregate() handler) function?
You can't. Your approach to use transformValues()
before the aggregation (and to enrich the Event
object is the right approach.
There was a proposal to extend the API to allow accessing record metadata within aggregate()
and other DSL operators, but it was never pushed over the finish line (cf https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams).