Search code examples
apache-kafkaapache-flinkflink-streamingflink-sql

How to join in flink with State Time-To-Live (TTL)


In a Flink job, I read a Kafka stream and apply some joins before saving the data in a database. Kafka topic contains two types of data, so I first join both records, create a single row, and save it into the database. I need the latest data to be stored in the database.

for example -

{"id":1, "item":"1", "amount":300, "type":"revenue"}
{"id":1, "item":"1", "amount":30, "type":"profit"}

I need the data in the table

id | item | revenue | profit
1  |  1   | 300     | 30

I am creating a createTemporaryView ("tableA") on top of the Kafka source and then applying the below-joining logic

Table tr = tableEnv.sqlQuery("select " +
            " coalesce(a.id, b.id) id," +
            " coalesce(a.item, b.item) item," +
            " a.amount as revenue," +
            " b.amount as profit" +
            " from " +
            " (select * from tableA" +
            " where type='revenue') a" +
            " full outer join " +
            " (select * from tableA" +
            " where type='profit') b" +
            " on a.id=b.id, a.item=b.item");

Lastly, save the data in the table. It is working fine. The flink maintains all the state of the join. If id=1 comes 100 times, then flink maintains 100 entries in its state. As a result, a memory problem will arise

Flink is new to me, do we have any setting (TTL) to ensure that it only maintains the last updated value and removes all previous states? In addition, how to configure rockDB settings to maintain robust storage for this state.

flink version : 1.13.6


Solution

  • Flink's state TTL mechanism doesn't have the right behavior for this use case. State TTL keeps too much state, as it retains all recent state, rather than only the most recent state. And worse, it drops the most recent state when it is no longer recent enough.

    One of the Immerok Apache Flink Cookbook recipes covers this case; see the streaming table workflow in this recipe about keeping track of each customer's most recent transaction. This is an example of a top-n query, where n equals 1:

    SELECT [column_list]
    FROM (
       SELECT [column_list],
         ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
           ORDER BY time_attr DESC AS rownum
       FROM table_name)
    WHERE rownum = 1
    

    The key here is expressing the constraint in the query itself, so that the SQL engine creates a plan that only retains the necessary state.

    Disclaimer: I work for Immerok.