Search code examples
apache-flink

How to use Flink Temporal Tables?


The new Temporal Tables in Flink look awesome but I have not yet been able to make them work. As I cannot find any working examples I wonder if anyone else has got it to work and can point out what I'm doing wrong.

Here's a little bit of context:

query:

SELECT s.id FROM sitemembership AS m, LATERAL TABLE (site(m.ts)) AS s WHERE m.siteId = s.id

The setup:

// { "streamName": "sitemembership", "key": "siteId" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
table.printSchema();
tableEnv.registerTable(streamName, table);

// { "streamName": "site", "key": "id" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
TemporalTableFunction temporalTable = table.createTemporalTableFunction("ts", key);
tableEnv.registerFunction(streamName, temporalTable);

I'm getting no rows whatsoever and no errors. I've tried flipping the query by changing which table I register as temporal but with no success. I have also looked at the "ts" column and get dates that makes me believe I should get at least a few rows.

Any help is appreciated.

P.S. I'm running this on historical data from kafka partitioned on "id" which is also the row key


Solution

  • You can find fully working code "examples" in form of the tests here (Content of those two tests (processing time and event time) is more or less repeated in the documentation here and here or here). You can start with those examples and then step by step convert them to your exact use case/scenario. It might be beneficial to first start with pre-defined set of data and only later switch to reading from Kafka.

    Regarding your issue, it is unclear from your code snippet what is wrong, some of the potential issues:

    • watermarks are not assigned/not increasing (assignTimestampsAndWatermarks() call in the linked testEventTimeInnerJoin()). Temporal Join operator emits the data only on watermark.
    • row time between those two tables you are trying to join is out of sync. If site has no row that is old enough to be joined with sitemembership records, the result will be empty. Like for example if all of the records from site have time fields from year 2019, while sitemembership have only records from 2018.