Search code examples
apache-flinkflink-streamingflink-sql

Apache Flink: How to enable "upsert mode" for dynamic tables?


I have seen several mentions of an "upsert mode" for dynamic tables based on a unique key in the Flink documentation and on the official Flink blog. However, I do not see any examples / documentation regarding how to enable this mode on a dynamic table.

Examples:

  • Blog post:

    When defining a dynamic table on a stream via update mode, we can specify a unique key attribute on the table. In that case, update and delete operations are performed with respect to the key attribute. The update mode is visualized in the following figure.

  • Documentation:

    A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key.

So my questions are:

  • How do I specify a unique key attribute on a dynamic table in Flink?
  • How do I place a dynamic table in update/upsert/"replace" mode, as opposed to append mode?

Solution

  • Update: since Flink 1.9, LAST_VALUE is part of the build-in aggregate functions, if we use the Blink planner (which is the default since Flink 1.11).

    Assuming the existence of the Logins table mentioned in the response of Fabian Hueske above, we can now convert it to an upsert table as simply as:

    SELECT 
      user, 
      LAST_VALUE(loginTime), 
      LAST_VALUE(ip) 
    FROM Logins 
    GROUP BY user