I have seen several mentions of an "upsert mode" for dynamic tables based on a unique key in the Flink documentation and on the official Flink blog. However, I do not see any examples / documentation regarding how to enable this mode on a dynamic table.
Examples:
When defining a dynamic table on a stream via update mode, we can specify a unique key attribute on the table. In that case, update and delete operations are performed with respect to the key attribute. The update mode is visualized in the following figure.
A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key.
So my questions are:
Update: since Flink 1.9, LAST_VALUE
is part of the build-in aggregate functions, if we use the Blink planner (which is the default since Flink 1.11).
Assuming the existence of the Logins
table mentioned in the response of Fabian Hueske above, we can now convert it to an upsert table as simply as:
SELECT
user,
LAST_VALUE(loginTime),
LAST_VALUE(ip)
FROM Logins
GROUP BY user