Search code examples
apache-flinkflink-streamingflink-batch

Flink: Left join equivalent of Dataset API in Batch mode of DataStream API?


It has been mentioned in Flink docs that DataSet API will be deprecated in future. So I am looking into prototyping this Dataset API to DataStream API in Batch Mode (which I believe is in Beta right now) migration.

We have this(similar) code in our codebase that uses leftOuterJoin on a DataSet.

 DataSet<SomeOutType> joined_out =  datasetA.
                leftOuterJoin(datasetB, JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND)
                .where((left) -> coalesce(left.getId(), -9999999L))
                .equalTo((right) -> right.company_id).with((JoinFunction<SomeTypeA, SomeTypeB, SomeOutType>) (left, right) -> {
                    SomeOutType recNew = SomeOutType.newBuilder().build();
                    recNew.setCustomerId(left.getCustomerId());
                    recNew.setCustomerName((right != null && right.cust_name != null) ? right.cust_name : "Blank");
                    ....
                    ....
                    ....
                    return  recNew;

                });

The problem is I'm not able to find Left Join or Left Outer Join equivalent in the Datastream API docs - Join.

Since they are thinking of totally deprecating the DataSet API, I am assuming there should be a way to do this Left Outer Join in DataStream API now.

Can someone please guide me in the right way to do this? TIA


Solution

  • The relational operations on DataSets (e.g., joins) are being deprecated in favor of using the relational operations offered by the Table/SQL API, which is fully interoperable with the DataStream API.

    See https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/table/tableapi/#joins and https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/.

    You can either use Table connectors and perform a join directly on the tables they produce, or convert datastreams to tables before performing the join. And you can convert from tables back to datastreams if needed for further processing. Given the table/stream duality, these "conversions" don't really cost anything. See https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/table/data_stream_api/.

    FWIW, Flink 1.14 was just released, and it includes a number of improvements related to this topic. In particular, only in 1.14 (and beyond) can you combine the Table API with the DataStream API in batch execution mode.