We've 4 CDC sources defined of which we need to combine the data into one result table. We're creating a table for each source using the SQL API, eg:
"CREATE TABLE IF NOT EXISTS PAA31 (\n" +
" WRK_SDL_DEF_NO STRING,\n" +
" HTR_FROM_DT BIGINT,\n" +
...
" update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n" +
" PRIMARY KEY (WRK_SDL_DEF_NO) NOT ENFORCED,\n" +
" WATERMARK FOR update_time AS update_time\n" +
") WITH ('value.format' = 'debezium-json' ... )";
After we've defined each table, we create a new table by running the following query:
"SELECT PAA30.WRK_SDL_DEF_NO as id,\n" +
" PAA33.DSB_TX as description,\n" +
...
"FROM PAA30\n" +
"INNER JOIN PAA33 ON PAA30.WRK_SDL_DEF_NO = PAA33.WRK_SDL_DEF_NO AND PAA33.LGG_CD = 'NL' \n" +
"INNER JOIN PAA31 ON PAA30.WRK_SDL_DEF_NO = PAA31.WRK_SDL_DEF_NO \n" +
"INNER JOIN PAA32 ON PAA30.WRK_SDL_DEF_NO = PAA32.WRK_SDL_DEF_NO";
Note some rows have been left out for formatting reasons.
The issue we're running into is that executing this exact job results in inconsistent outcomes where sometimes we have 1750 resulting rows (correct), however most of the times the resulting rows is less and random.
This is the plan overview for the job in Flink. The amount of records sent from the sources are all correct, however the amount of records sent of the 1st join statement is not:
Flink Job Execution Plan and numbers
What could be the cause and how can we have consistent joining of all data sources?
We've been able to get consistent results, even for bigger datasets, by enabling MiniBatch Aggregation
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "500 ms");
configuration.setString("table.exec.mini-batch.size", "5000");
This seems to fix the consistency issue for both the local filesystem connector as well as for the Flink Pulsar connector.
From these findings, it seems Flink was having issues with the overhead of state management for our throughput. We'll still need to assess realistic CDC initial load processing, but so far enabling MiniBatch Aggregation seems promising
Thanks @david-anderson for thinking with us and trying to figure this out.