What I want to achieve is to run a Flink SQL query in streaming mode (simple aggregations like count, min, max), but I need to load archival data as well to have valid aggregations. To this end I created a Flink table for a Kafka topic and a for a table in relational database. Then I union
both tables and apply deduplication pattern since some records from Kafka topic may have already been saved into the relational database.
create table `kafka_catalog`.`default_database`.`dummy` (
recordId STRING,
recordTime TIMESTAMP(3),
proctime AS PROCTIME()
) with (
'connector' = 'kafka',
...
);
create table `jdbc_catalog`.`default_database`.`dummy` (
recordId STRING,
recordTime TIMESTAMP(3),
proctime AS PROCTIME()
) with (
'connector' = 'jdbc',
...
);
create view `dummy_union` as
select * from `kafka_catalog`.`default_database`.`dummy`
union
select * from `jdbc_catalog`.`default_database`.`dummy`
;
create view `dummy_full_history` as
select
*
from (
select
*,
row_number() over (partition by recordId order by proctime asc) as row_num
from
dummy_union
)
where
row_num = 1
;
select * from dummy_full_history;
Unfortunately, according to the query plan the optimisation for deduplication is not applied. Instead Calc -> GroupAggregate -> Rank -> Calc
is applied.
[69]:TableSourceScan(table=[[kafka_catalog, default_database, dummy]], fields=[recordId, recordTime])
+- [70]:Calc(select=[recordId, recordTime, PROCTIME() AS proctime])
[71]:TableSourceScan(table=[[jdbc_catalog, default_database, dummy]], fields=[recordId, recordTime])
+- [72]:Calc(select=[recordId, recordTime, PROCTIME() AS proctime])
[74]:Calc(select=[recordId. recordTime, PROCTIME_MATERIALIZE(proctime) AS proctime])
[76]:GroupAggregate(groupBy=[recordId, proctime], select=[recordId, proctime])
[78]:Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[recordId], orderBy=[proctime ASC], select=[recordId, proctime])
+- [79]:Calc(select=[recordId, CAST(proctime AS VARCHAR(2147483647)) AS proctime, 1 AS row_num])
+- [80]:ConstraintEnforcer[NotNullEnforcer(fields=[proctime, row_num])]
+- Sink: Collect table sink
When I apply deduplication on a single table, it works like a charm:
[4]:Deduplicate(keep=[FirstRow], key=[recordId], order=[PROCTIME])
Any suggestions how to make it work?
You should use union all
.
create view `dummy_union` as
select * from `kafka_catalog`.`default_database`.`dummy`
union all
select * from `jdbc_catalog`.`default_database`.`dummy`
;
According to docs: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/set-ops/