Search code examples
apache-flinkflink-streaming

Stream Joins for Large Time Windows with Flink


I need to join two event sources based on a key. The gap between the events can be up to 1 year(ie. event1 with id1 may arrive today and the corresponding event2 with id1 from the second event source may arrive a year later). Assume I want to just stream out the joined event output.

I am exploring the option of using Flink with the RocksDB backend(I came across Table APIs which appear to suit my use case). I am not able to find references architectures that do this kind of long window joins. I am expecting the system to process about 200M events a day.

Questions:

  1. Are there any obvious limitations/pitfalls of using Flink for this kind of Long Window joins?

  2. Any recommendations on handling this kind of long window joins

Related: I am also exploring using Lambda with DynamoDB as the state to do stream joins(Related Question). I will be using managed AWS services if this info is relevant.


Solution

  • The obvious challenge of this use case are the large join window size of one year and the high ingestion rate which can result in a huge state size.

    The main question here is whether this is a 1:1 join, i.e., whether a record from stream A joins exactly (or at most) once with a record from stream B. This is important, because if you have a 1:1 join, you can remove a record from the state as soon as it was joined with another record and you don't need to keep it around for the full year. Hence, your state only stores records that were not joined yet. Assuming that the majority of records is quickly joined, your state might remain reasonable small.

    If you have a 1:1 join, the time-window joins of Flink's Table API (and SQL) and the Interval join of the DataStream API are not what you want. They are implemented as m:n joins because every record might join with more than one record of the other input. Hence they keep all records for the full window interval, i.e., for one year in your use case. If you have a 1:1 join, you should implement the join yourself as a KeyedCoProcessFunction.

    If every record can join multiple times within one year, there's no way around buffering these records. In this case, you can use the time-window joins of Flink's Table API (and SQL) and the Interval join of the DataStream API.