Search code examples
apache-flinkflink-sql

Can I deduplicate records in union of two Flink SQL tables?


What I want to achieve is to run a Flink SQL query in streaming mode (simple aggregations like count, min, max), but I need to load archival data as well to have valid aggregations. To this end I created a Flink table for a Kafka topic and a for a table in relational database. Then I union both tables and apply deduplication pattern since some records from Kafka topic may have already been saved into the relational database.

create table `kafka_catalog`.`default_database`.`dummy` (
    recordId STRING,
    recordTime TIMESTAMP(3),
    proctime AS PROCTIME()
) with (
    'connector' = 'kafka',
    ...
);

create table `jdbc_catalog`.`default_database`.`dummy` (
    recordId STRING,
    recordTime TIMESTAMP(3),
    proctime AS PROCTIME()
) with (
    'connector' = 'jdbc',
    ...
);

create view `dummy_union` as
    select * from `kafka_catalog`.`default_database`.`dummy`
    union
    select * from `jdbc_catalog`.`default_database`.`dummy`
;

create view `dummy_full_history` as
select
    *
from (
    select
        *,
        row_number() over (partition by recordId order by proctime asc) as row_num
    from
        dummy_union
)
where
    row_num = 1
;

select * from dummy_full_history;

Unfortunately, according to the query plan the optimisation for deduplication is not applied. Instead Calc -> GroupAggregate -> Rank -> Calc is applied.

[69]:TableSourceScan(table=[[kafka_catalog, default_database, dummy]], fields=[recordId, recordTime])
+- [70]:Calc(select=[recordId, recordTime, PROCTIME() AS proctime])
[71]:TableSourceScan(table=[[jdbc_catalog, default_database, dummy]], fields=[recordId, recordTime])
+- [72]:Calc(select=[recordId, recordTime, PROCTIME() AS proctime])
 [74]:Calc(select=[recordId. recordTime, PROCTIME_MATERIALIZE(proctime) AS proctime])
 [76]:GroupAggregate(groupBy=[recordId, proctime], select=[recordId, proctime])
 [78]:Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[recordId], orderBy=[proctime ASC], select=[recordId, proctime])
+- [79]:Calc(select=[recordId, CAST(proctime AS VARCHAR(2147483647)) AS proctime, 1 AS row_num])
   +- [80]:ConstraintEnforcer[NotNullEnforcer(fields=[proctime, row_num])]
      +- Sink: Collect table sink

When I apply deduplication on a single table, it works like a charm:

 [4]:Deduplicate(keep=[FirstRow], key=[recordId], order=[PROCTIME])

Any suggestions how to make it work?


Solution

  • You should use union all.

    create view `dummy_union` as
        select * from `kafka_catalog`.`default_database`.`dummy`
        union all
        select * from `jdbc_catalog`.`default_database`.`dummy`
    ;
    

    According to docs: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/set-ops/