Search code examples
scalaapache-flinkflink-streamingflink-sql

Flink Table/SQL API: modify rowtime attribute after session window aggregation


I want to use Session window aggregation and then run Tumble window aggregation on top of the produced result in Table API/Flink SQL.

Is it possible to modify rowtime attribute after first session aggregation to have it equal a .rowtime of the last observed event in a session?

I'm trying to do something like this:

table
  .window(Session withGap 2.minutes on 'rowtime as 'w)
  .groupBy('w, 'userId)
  .select(
    'userId,
    ('w.end.cast(Types.LONG) - 'w.start.cast(Types.LONG)) as 'sessionDuration,
    ('w.rowtime - 2.minutes) as 'rowtime
  )
  .window(Tumble over 5.minutes on 'rowtime as 'w)
  .groupBy('w)
  .select(
    'w.start,
    'w.end,
    'sessionDuration.avg as 'avgSession,
    'sessionDuration.count as 'numberOfSession
  )

The key part is:

('w.rowtime - 2.minutes) as 'rowtime

So I want to re-assign to a record the .rowtime of the latest event in the session, without the session gap (2.minutes in this example).

This works fine in BatchTable, however it doesn't work in StreamTable:

Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'rowtime, 300000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

Yeah, I know, it feels like I wan't to invent a time machine and change the order of time. But is it actually possible to somehow achieve described behaviour?


Solution

  • No, unfortunately, you cannot do that with SQL or the Table API in the current version (1.6.0). As soon as you modify a time attribute (rowtime or proctime), it becomes a regular TIMESTAMP attribute and loses its special time characteristics.

    For rowtime attributes the reason is that we cannot guarantee that the timestamp is still aligned with the watermarks. In principle, we could delay the watermarks by the subtracted time interval, but this is not supported yet.