Search code examples
flink-streamingflink-sql

Flink SQL Streaming - How to Join tables effectively where change of record is nondeterministic


Kaka topic (input: table1,table2, output: table3)

Flink SQL Streaming job

CREATE TEMPORARY VIEW distinct_table1 AS
SELECT *
FROM (SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_date desc) AS rownum
FROM table1)
WHERE rownum = 1;


CREATE TEMPORARY VIEW distinct_table2 AS
SELECT *
FROM (SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_date desc) AS rownum
FROM table2)
WHERE rownum = 1;


Insert into table3
Select t1.col1,t1.col2..,t1.coln,t2.col1,t2.col2,t2.coln
From distinct_table t1 inner join distinct_table t2 on t1.id=t2.t1_id 

Rate of change for same record in table1 and table2 is nondeterministic sometime 10k/day and sometime none for month. Kafka topic is configured with compaction

Edit: Distinct SQL clause creates an operator like Rank[20] -> Calc[21]

Join clause creates an operator something like Join[23] -> Calc[24] DAG looks like

source_t1 ---> rank[20]_calc[21] ---
                                    |>  join[5]_calc[7] ---> Sink
source_t2 ---> rank[10]_calc[11] ---

Question#1: In this case at some point, state will grow to the extent that exceeds the local disk. What will happen in that case, task manager will fail ?

Question#2: In scenarios like above where rate of change is nondeterministic, any configuration that dictates the rocksdb to maintain latest record(not time based where record is purged if no update happened, we have to keep at-least 1 record to make a join) in level0 and purge the older history so that it does not require large disk space and out of disk space issue?

I am assuming operators like Rank[20] -> Calc[21] and Join[23] -> Calc[24] does maintain the history in rocksdb for each id/PK of corresponding table that is changed


Solution

  • #1 When the RocksDB state exceeds the capacity of the local disks, then yes, Flink will fail. As a workaround, you could reshard the state by increasing the parallelism.

    #2 What's available is https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/config/#table-exec-state-ttl, which is not what you've asked for.

    In order to get more control over the state retention policy, you could instead write the job using the DataStream API, but that would be a non-trivial undertaking.