Search code examples
apache-flinkflink-streamingflink-sql

What is the difference between Lookup and Processing Time Temporal join in Flink?


In my opinion, Processing Time Temporal Join is used for a stream and an external database and always join the latest value in the external database based on the join condition. Also, Processing Time Temporal Join is used when the external table is not feasible to materialize the table as a dynamic table within Flink.

Similarly, Lookup Join is used for a stream and an external database, and always look up a value in the external database based on the join condition.

Will Lookup Join materialize the external database table in Flink? What't the difference between them?


Solution

  • A processing time temporal join is a join between two streams, while a lookup join is a join between a stream and an external database.

    While Flink supports two types of event time temporal joins, one with the FOR SYSTEM_TIME AS OF syntax, and the other using temporal table functions, only the latter approach based on table functions is supported for processing time temporal joins.

    A processing time temporal join works with two streams representing append-only dynamic tables -- e.g.,

    SELECT
      o_amount, r_rate
    FROM
      Orders,
      LATERAL TABLE (Rates(o_proctime))
    WHERE
      r_currency = o_currency
    

    When this temporal join is executed with a processing time attribute (as shown above), each incoming Order will be joined with the latest value from the Rates table/stream. The Orders table/stream will not be materialized at all, and the Rates table/stream will only retain the most recently consumed version of the Rate for each currency.

    Unlike event time temporal joins, processing time temporal joins do not provide deterministic results.

    By contrast, lookup joins execute queries against a lookup source, such as a JDBC database. By default, nothing is materialized in Flink, but some lookup sources (such as JDBC) offer optional caching.

    These lookup joins also do not guarantee deterministic results, and instead execute the join with whatever data is available at the time the join is executed, with that data coming either from the cache or from a query.

    Both temporal joins and lookup joins will NOT update their results. You just get a best-effort result based on what was known to the runtime at the time the join was executed.

    Why does Flink bother offering processing time temporal joins? Why not just use a lookup join instead? Two reasons:

    • A lookup join is more expensive -- it's necessary to query the external database, and wait for a response. Yes, caching is a possibility, but then you're likely to be joining with obsolete data. With a processing time temporal join you are assured of using reasonably fresh data that is automatically updated.

    • Lookup joins require the implementation of a special connector. Temporal joins use the standard streaming connectors, so they are more universally available.