Search code examples
javaapache-flinkflink-sql

Flink Autojoin with rowtime column


I have a Flink table with the following structure :

Id1, Id2, myTimestamp, value

Where the rowtime is based on myTimestamp.

I have the following processing that works well :

Table processed = tableEnv.sqlQuery("SELECT " +
                "Id1, " +
                "MAX(myTimestamp) as myTimestamp, " +
                "SUM(value) as value " +
                "FROM MyTable " +
                "GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");

I want to adapt the previous code, such as for each window, I use only the latest record per Id2. So I though that changing the code as follow would work :

Table processed = tableEnv.sqlQuery("SELECT " +
                "Id1, " +
                "MAX(myTimestamp) as myTimestamp, " +
                "SUM(value) as value " +
                "FROM MyTable, " + 
                "(SELECT Id2, MAX(myTimestamp) as latestTimestamp FROM MyTable GROUP BY Id2) as RecordsLatest" +
                "WHERE  MyTable.Id2 = RecordsLatest.Id2 AND MyTable.myTimestamp = RecordsLatest.myTimestamp" +
                "GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");

But when I do so, I get the following error :

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
    at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
    at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
    at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
    at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
    at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
    at org.apache.flink.table.api.Table.insertInto(table.scala:1126)

It looks like Flink do not 'understand' that the two tables I am joining are the same one.

How can I do what I want to do ?


Solution

  • There are few reasons why your query does not work.

    SELECT 
      Id1, MAX(myTimestamp) as myTimestamp, SUM(value) as value 
    FROM 
      MyTable, 
      (SELECT Id2, MAX(myTimestamp) as latestTimestamp 
       FROM MyTable 
       GROUP BY Id2
      ) as RecordsLatest
    WHERE 
      MyTable.Id2 = RecordsLatest.Id2 
      AND MyTable.myTimestamp = RecordsLatest.myTimestamp
    GROUP BY 
      Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)
    

    Some are due to limitations in Flink, others are more fundamental.

    1. latestTimestamp is not a rowtime attribute anymore. This is because, it is computed. As soon as you use a rowtime attribute in an expression (incl. aggregation functions like MAX) they lose their rowtime property and become regular TIMESTAMP attributes.
    2. The inner query produces a dynamic table that updates its results. It is not an append-only table. As soon as the max timestamp of an Id2 changes, the previous result row needs to be retracted and the new result row be inserted.
    3. Since RecordsLatest is an updating table (and not an append-only table) and latestTimestamp is not a rowtime attribute, the join of RecordsLatest and MyTable is a "regular join" (and not a time-windowed join) that produces also an updating result and not an append-only result. A regular join cannot produce any rowtime attributes, because there is no guarantee about the order of output rows (which is a prerequisite for rowtime attributes because they need to be aligned with watermarks) and results might need to remove them in the future. This is causing the error message that you see.
    4. The GROUP BY clause of the outer query requires an append-only input table with a rowtime attribute rowtime. However, the output of the join is not append-only but updating and the rowtime attribute cannot be a rowtime attribute as explained before.

    Solving your task is unfortunately not straightforward but should be possible.

    First of all, you should return have a query that returns for each (Id1, Id2) window the values for row with the max timestamp:

    SELECT 
      Id1, Id2,
      MAX(myTimestamp) AS maxT
      ValOfMaxT(valX, myTimestamp) AS valXOfMaxT,
      HOP_ROWTIME(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS rowtime
    FROM
      MyTable
    GROUP BY
      Id1, Id2, HOP(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND)
    

    The ValOfMaxT function is a user-defined aggregation function that identifies the value for the maximum timestamp and returns it. rowtime is the new rowtime attribute and 1ms before the end timestamp of the window.

    Given this table, let's call it Temp you can define the next query as:

    
    SELECT
      Id1, MAX(maxT) as myTimestamp, SUM(valXOfMaxT)
    FROM Temp
    GROUP BY
      Id1, TUMBLE(rowtime, INTERVAL '10' SECONDS)
    

    This query only groups on Id1 and a TUMBLE window. It's a TUMBLE window because the first HOP window already grouped each record into three windows and we should not do that again. Instead, we group the result of the first query into 10 second windows, because that's the slide length of the HOP windows in the first query.