Search code examples
apache-flinkflink-sql

Flink temporal join not showing data


I am trying to replicate the temporal join example on Flink docs, however no results are being showed. No errors either.

My table:

CREATE TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = '<my-server>',
  'key.format' = 'raw',
  'value.format' = 'json'
);
CREATE TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = '<my-server>',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json'
);

Inserts

INSERT into currency_rates 
VALUES ('EURO', 0.0139, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));

INSERT into currency_rates 
VALUES ('CAD', 0.03101, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));

INSERT into transactions 
VALUES ('001', 'EURO', 9.10, TO_TIMESTAMP('2022-01-12 12:50:00', 'yyyy-MM-dd HH:mm:ss'));

INSERT into transactions 
VALUES ('002', 'CAD', 5.20, TO_TIMESTAMP('2022-01-12 12:51:10', 'yyyy-MM-dd HH:mm:ss'));

INSERT into transactions 
VALUES ('003', 'EURO', 12.12, TO_TIMESTAMP('2022-01-12 12:52:10', 'yyyy-MM-dd HH:mm:ss'));

INSERT into transactions 
VALUES ('004', 'CAD', 13.13, TO_TIMESTAMP('2022-01-12 12:53:20', 'yyyy-MM-dd HH:mm:ss'));

Join query:

SELECT 
  t.id,
  t.total * c.eur_rate AS total_eur,
  t.total, 
  c.currency_code,
  t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;

No results are showed on the join query and I couldnt find any working sample out there.

What am I missing to get this temporal join to work?


Solution

  • The problem has to do with the watermarks. A temporal join doesn't produce an updating/retraction stream, so it must wait for evidence that the currency_rates stream is complete up through the time of the first transaction before it can produce a final result for that transaction. (And so on, for the subsequent transactions.)

    If you add

    INSERT into currency_rates 
    VALUES ('EURO', 0.0130, TO_TIMESTAMP('2022-01-12 13:00:00', 'yyyy-MM-dd HH:mm:ss'));
    

    that should be enough to flush out some results.

    If that doesn't solve the problem, then the per-partition watermarking that some sources (including Kafka) use could be the problem. You can address this by either ensuring that every Kafka partition has some data, or by setting the table-exec-source-idle-timeout configuration parameter so that the idle partitions don't hold back the watermark indefinitely.

    For myself, I also had to add this property to the transactions table:

    'scan.startup.mode' = 'earliest-offset',
    

    Without this change to the table DDL I was getting an error. (See https://stackoverflow.com/a/70739865/2000823 for more info on this.)

    These are the results I got from the temporal join:

    id    eur_rate   total_eur    total   currency_code   transaction_time
    001   0.0139     0.126490      9.10   EURO            2022-01-12 12:50:00.000
    003   0.0139     0.168468     12.12   EURO            2022-01-12 12:52:10.000
    002   0.0310     0.161200      5.20   CAD             2022-01-12 12:51:10.000