I'm trying to come up with a solution which involves applying some logic after the join operation to pick one event from streamB
among multiple EventB
s. It would be like a reduce function but it only returns 1 element instead of doing it incrementally. So the end result would be a single (EventA
, EventB
) pair instead of a cross product of 1 EventA
and multiple EventB
.
streamA
.keyBy((a: EventA) => a.common_key)
.intervalJoin(
streamB
.keyBy((b: EventB) => b.common_key)
)
.between(Time.days(-30), Time.days(0))
.process(new MyJoinFunction)
The data would be ingested like (assuming they have the same key):
EventB ts: 1616686386000
EventB ts: 1616686387000
EventB ts: 1616686388000
EventB ts: 1616686389000
EventA ts: 1616686390000
Each EventA
key is guaranteed to arrive only once.
Assume a join operation like above and it generated 1 EventA
with 4 EventB
, successfully joined and collected in MyJoinFunction
. Now what I want to do is, access these values at once and do some logic to correctly match the EventA
to exactly one EventB
.
For example, for the above dataset I need (EventA
1616686390000
, EventB
1616686387000
).
MyJoinFunction
will be invoked for each (EventA
, EventB
) pair but I'd like an operation after this, that lets me access an iterator so I can look through all EventB
events for each EventA
.
I am aware that I can apply another windowing operation after the join to group all the pairs, but I want this to happen immediately after the join succeeds. So if possible, I'd like to avoid adding another window since my window is already large (30 days).
Is Flink the correct choice for this use case or I am completely in the wrong?
This could be implemented as a KeyedCoProcessFunction
. You would key both streams by their common key, connect them, and process both streams together.
You can use ListState
to store the events from B (for a given key), and ValueState
for A (again, for a given key). You can use an event time timer to know when the time has come to look through the B events in the ListState, and produce your result. Don't forget to clear the state once you are finished with it.
If you're not familiar with this part of the Flink API, the tutorial on Process Functions should be helpful.