Search code examples
apache-flinkflink-sql

Why do my Flink SQL queries have very different checkpoint sizes?


When using Flink Table SQL in my project, I found that if there was any GROUP BY clause in my SQL, the size of the checkpoint will increase vastly.

For example,

INSERT INTO COMPANY_POST_DAY
SELECT
    sta_date,
    company_id,
    company_name
FROM
    FCBOX_POST_COUNT_VIEW

The checkpoint size would be less than 500KB.

But when use like this,

INSERT INTO COMPANY_POST_DAY
SELECT
    sta_date,
    company_id,
    company_name,
    sum(ed_post_count)
FROM
    FCBOX_POST_COUNT_VIEW
GROUP BY
    sta_date, company_id, company_name, TUMBLE(procTime, INTERVAL '1' SECOND)

The checkpoint size would be more than 70MB, even when there is no any message processed. Like this,

Image is here.

But When using DataStream API and the keyBy instead of Table SQL GROUP BY,the checkpoint size would be normal, less than 1MB.

Why?

-------updated at 2019-03-25--------

After doing some tests and reading source code, we found that the reason for this was RocksDB.

When using RockDB as the state backend, the size of the checkpoint will be more than about 5MB per key, and when using filesystem as the state backend, the size of the checkpoint will fall down to less than 100KB per key.

Why do RocksDB need so much space to hold the state? When should we chooose RocksDB?


Solution

  • First of all, I would not consider 70 MB as huge state. There are many Flink jobs with multiple TBs of state. Regarding the question why the state sizes of both queries differ:

    The first query is a simple projection query, which means that every record can be independently processed. Hence, the query does not need to "remember" any records but only the stream offsets for recovery.

    The second query performs a window aggregation and needs to remember an intermediate result (the partial sum) for every window until time progressed enough such that the result is final and can be emitted.

    Since Flink SQL queries are translated into DataStream operators, there is not much difference between a SQL query and implementing the aggregation with keyBy().window(). Both run pretty much the same code.

    Update: The cause of the increased state has been identified to be caused by the RocksDBStateBackend. This overhead is not per-key overhead but overhead per stateful operator. Since the RocksDBStateBackend is meant to hold state sizes of multiple GBs to TBs, an overhead of a few MB is negligible.