Search code examples
joinapache-flinkdata-stream

Store an element X of an unbounded stream until the Y element arrives, then join them


I'm wondering if using Apache Flink is possible to solve the following problem.

Suppose I have a database with two tables: "clients" and "orders". The "orders" table contains a foreign key called "client_id_fk" that refers to the primary key of the "clients" table, called "client_id". Now, suppose I capture events that happen on the tables, like the inserts, the updates and the deletes, and I pass all of these to an unbounded Flink DataStream. When the events come inside the Flink DataStream, it writes them inside another storage, e.g. an Apache Kafka topic.

Suppose that inside "clients" are added five new records: A, B, C, D, E, and that the Flink DataStream receives these records in that precise order. Now, suppose that inside "orders" is added the record O_A, referred to the A client; then, this event is also pushed inside the Flink DataStream. As I have described before, every event will be written directly inside the Kafka topic when it arrives inside the DataStream; here comes my question: is there a way in Apache Flink to store the event A until the event O_A arrives, then join them, and than write them inside the Kafka topic? Clearly, I know that I have to wait for O_A before write A inside the topic.

To recap, the Flink DataStream receives the following events in the following order:

A, B, C, D, E, O_A

When it receives the event A, instead of writing it inside the Apache Kafka topic, it should store it and wait until the event O_A arrives; then, it joins them, and then it writes them inside the topic.

I thank you all in advance, would it be also possible to have an intuitive snippet written using the Java Apache Flink API that shows how to realize that?


Solution

  • Yes, this is certainly doable with Flink. Using the DataStream API, you would accomplish this by keying the client stream by client_id, and the orders stream by client_id_fk, and connecting those two keyed streams in with a KeyedCoProcessFunction. In this operator you would use keyed state to store the client records until the orders arrive.

    The Flink Training from Ververica includes a couple of examples of how to implement a join that is pretty much the same -- you'll those examples here and here, but to better understand them, I recommend starting from the beginning.