I want to use Session
window aggregation and then run Tumble
window aggregation on top of the produced result in Table API/Flink SQL
.
Is it possible to modify rowtime
attribute after first session
aggregation to have it equal a .rowtime
of the last observed event in a session?
I'm trying to do something like this:
table
.window(Session withGap 2.minutes on 'rowtime as 'w)
.groupBy('w, 'userId)
.select(
'userId,
('w.end.cast(Types.LONG) - 'w.start.cast(Types.LONG)) as 'sessionDuration,
('w.rowtime - 2.minutes) as 'rowtime
)
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy('w)
.select(
'w.start,
'w.end,
'sessionDuration.avg as 'avgSession,
'sessionDuration.count as 'numberOfSession
)
The key part is:
('w.rowtime - 2.minutes) as 'rowtime
So I want to re-assign to a record the .rowtime
of the latest event in the session, without the session gap (2.minutes
in this example).
This works fine in BatchTable, however it doesn't work in StreamTable:
Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'rowtime, 300000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.
Yeah, I know, it feels like I wan't to invent a time machine and change the order of time. But is it actually possible to somehow achieve described behaviour?
No, unfortunately, you cannot do that with SQL or the Table API in the current version (1.6.0). As soon as you modify a time attribute (rowtime or proctime), it becomes a regular TIMESTAMP
attribute and loses its special time characteristics.
For rowtime attributes the reason is that we cannot guarantee that the timestamp is still aligned with the watermarks. In principle, we could delay the watermarks by the subtracted time interval, but this is not supported yet.