Search code examples
inner-joinapache-flinkflink-streaming

Implementation of Streaming Join in Flink


I am looking at the various implementations of join in Flink. In batch mode, I have come across the hybrid-hash join and sort-merge join. In both cases, there is a blocking shuffling that is done before the join and hence the output of the operators before join are materialized to some non-ephemeral storage as is said here.

I am now looking at the stream join case. I have seen an implementation where two hash tables are made for the two inputs. Whenever an input comes it is saved in its hash table and also probed against the other hash tables to produce results. To limit the hash table sizes, we put a window for which an input is saved in the hash table. My first question is:

Do all stream join cases have this requirement of a window?

Specifically, I want to discuss the implementation of the join where a large static customers table is joined with an Orders stream. In my opinion, the physical implementation should be something like this:

Hybrid hash join

The customers table is first hash-partitioned. Then, the orders stream starts flowing in. As the execution mode is streaming, the orders table is directly sent to join tasks without any materialization.

Does flink have such a join or can I implement this in Flink?


Solution

  • Well, that's exactly how it is implemented for BATCH.

    In STREAMING you don't have the full Customers table in full, as by definition it is infinite.

    For BATCH, I'll just quote this post from their official blog:

    Flink has streaming runtime operators for many operations, but also specialized operators for bounded inputs [...] The batch join can read one input fully into a hash table and then probe with the other input. The stream join needs to build tables for both sides, because it needs to continuously process both inputs

    This link also has information on input size: it can spill to the disk. Windowing is not required (but if you specify it, it will for sure help you maintain performance/deployment size requirements )


    Now if you're in STREAMING mode and know that one side won't change, you can still tell Flink about it so it optimizes around that. Use JOIN <table> FOR SYSTEM_TIME AS OF <table>.{ proctime | rowtime } for that effect:

    Temporal joins take an arbitrary table (left input/probe site) and correlate each row to the corresponding row’s relevant version in the versioned table (right input/build side)

    However be aware that these probe side request will go right through Flink and look it up in the database if you're using JDBC (make sure you have an index on the join key)