Search code examples
apache-flinkflink-streamingflink-sql

Inconsistent results when joining multiple tables in Flink


We've 4 CDC sources defined of which we need to combine the data into one result table. We're creating a table for each source using the SQL API, eg:

"CREATE TABLE IF NOT EXISTS PAA31 (\n" +
"    WRK_SDL_DEF_NO     STRING,\n" +
"    HTR_FROM_DT        BIGINT,\n" +
...
"    update_time        TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n" +
"    PRIMARY KEY (WRK_SDL_DEF_NO) NOT ENFORCED,\n" +
"    WATERMARK FOR update_time AS update_time\n" +
") WITH ('value.format' = 'debezium-json' ... )";

After we've defined each table, we create a new table by running the following query:

"SELECT PAA30.WRK_SDL_DEF_NO as id,\n" +
"       PAA33.DSB_TX as description,\n" +
...
"FROM PAA30\n" +
"INNER JOIN PAA33 ON PAA30.WRK_SDL_DEF_NO = PAA33.WRK_SDL_DEF_NO AND PAA33.LGG_CD = 'NL' \n" +
"INNER JOIN PAA31 ON PAA30.WRK_SDL_DEF_NO = PAA31.WRK_SDL_DEF_NO \n" +
"INNER JOIN PAA32 ON PAA30.WRK_SDL_DEF_NO = PAA32.WRK_SDL_DEF_NO";

Note some rows have been left out for formatting reasons.

The issue we're running into is that executing this exact job results in inconsistent outcomes where sometimes we have 1750 resulting rows (correct), however most of the times the resulting rows is less and random.

This is the plan overview for the job in Flink. The amount of records sent from the sources are all correct, however the amount of records sent of the 1st join statement is not:

Flink Job Execution Plan and numbers
Flink Job Execution Plan and numbers

What could be the cause and how can we have consistent joining of all data sources?


Solution

  • We've been able to get consistent results, even for bigger datasets, by enabling MiniBatch Aggregation

    configuration.setString("table.exec.mini-batch.enabled", "true"); 
    configuration.setString("table.exec.mini-batch.allow-latency", "500 ms"); 
    configuration.setString("table.exec.mini-batch.size", "5000");
    

    This seems to fix the consistency issue for both the local filesystem connector as well as for the Flink Pulsar connector.

    From these findings, it seems Flink was having issues with the overhead of state management for our throughput. We'll still need to assess realistic CDC initial load processing, but so far enabling MiniBatch Aggregation seems promising

    Thanks @david-anderson for thinking with us and trying to figure this out.