Search code examples
scalaapache-flinkflink-streaming

Confused about intervalJoin


I'm trying to come up with a solution which involves applying some logic after the join operation to pick one event from streamB among multiple EventBs. It would be like a reduce function but it only returns 1 element instead of doing it incrementally. So the end result would be a single (EventA, EventB) pair instead of a cross product of 1 EventA and multiple EventB.

streamA
      .keyBy((a: EventA) => a.common_key)
      .intervalJoin(
          streamB
            .keyBy((b: EventB) => b.common_key)
        )
      .between(Time.days(-30), Time.days(0))
      .process(new MyJoinFunction)

The data would be ingested like (assuming they have the same key):

EventB ts: 1616686386000

EventB ts: 1616686387000

EventB ts: 1616686388000

EventB ts: 1616686389000

EventA ts: 1616686390000

Each EventA key is guaranteed to arrive only once.

Assume a join operation like above and it generated 1 EventA with 4 EventB, successfully joined and collected in MyJoinFunction. Now what I want to do is, access these values at once and do some logic to correctly match the EventA to exactly one EventB. For example, for the above dataset I need (EventA 1616686390000, EventB 1616686387000).

MyJoinFunction will be invoked for each (EventA, EventB) pair but I'd like an operation after this, that lets me access an iterator so I can look through all EventB events for each EventA.

I am aware that I can apply another windowing operation after the join to group all the pairs, but I want this to happen immediately after the join succeeds. So if possible, I'd like to avoid adding another window since my window is already large (30 days).

Is Flink the correct choice for this use case or I am completely in the wrong?


Solution

  • This could be implemented as a KeyedCoProcessFunction. You would key both streams by their common key, connect them, and process both streams together.

    You can use ListState to store the events from B (for a given key), and ValueState for A (again, for a given key). You can use an event time timer to know when the time has come to look through the B events in the ListState, and produce your result. Don't forget to clear the state once you are finished with it.

    If you're not familiar with this part of the Flink API, the tutorial on Process Functions should be helpful.