I am trying to replicate the temporal join example on Flink docs, however no results are being showed. No errors either.
My table:
CREATE TABLE currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS,
PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = '<my-server>',
'key.format' = 'raw',
'value.format' = 'json'
);
CREATE TABLE transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = '<my-server>',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json'
);
Inserts
INSERT into currency_rates
VALUES ('EURO', 0.0139, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into currency_rates
VALUES ('CAD', 0.03101, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('001', 'EURO', 9.10, TO_TIMESTAMP('2022-01-12 12:50:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('002', 'CAD', 5.20, TO_TIMESTAMP('2022-01-12 12:51:10', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('003', 'EURO', 12.12, TO_TIMESTAMP('2022-01-12 12:52:10', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('004', 'CAD', 13.13, TO_TIMESTAMP('2022-01-12 12:53:20', 'yyyy-MM-dd HH:mm:ss'));
Join query:
SELECT
t.id,
t.total * c.eur_rate AS total_eur,
t.total,
c.currency_code,
t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;
No results are showed on the join query and I couldnt find any working sample out there.
What am I missing to get this temporal join to work?
The problem has to do with the watermarks. A temporal join doesn't produce an updating/retraction stream, so it must wait for evidence that the currency_rates stream is complete up through the time of the first transaction before it can produce a final result for that transaction. (And so on, for the subsequent transactions.)
If you add
INSERT into currency_rates
VALUES ('EURO', 0.0130, TO_TIMESTAMP('2022-01-12 13:00:00', 'yyyy-MM-dd HH:mm:ss'));
that should be enough to flush out some results.
If that doesn't solve the problem, then the per-partition watermarking that some sources (including Kafka) use could be the problem. You can address this by either ensuring that every Kafka partition has some data, or by setting the table-exec-source-idle-timeout configuration parameter so that the idle partitions don't hold back the watermark indefinitely.
For myself, I also had to add this property to the transactions table:
'scan.startup.mode' = 'earliest-offset',
Without this change to the table DDL I was getting an error. (See https://stackoverflow.com/a/70739865/2000823 for more info on this.)
These are the results I got from the temporal join:
id eur_rate total_eur total currency_code transaction_time
001 0.0139 0.126490 9.10 EURO 2022-01-12 12:50:00.000
003 0.0139 0.168468 12.12 EURO 2022-01-12 12:52:10.000
002 0.0310 0.161200 5.20 CAD 2022-01-12 12:51:10.000