Search code examples
joinapache-flinkflink-streaming

How to use Apache Flink DataStream API to output stream of event pairs?


A shoe(an event) is defined as its color and isLeft (If shoe is for left leg then isLeft=true otherwise false).

Tuple2<String, Boolean> leftBlueShoe  = Tuple2.of("blue", true);
Tuple2<String, Boolean> rightBlueShoe = Tuple2.of("blue", false);
// unbounded stream of shoes is as follows
DataStream<Tuple2<String, Boolean>> streamOfShoes = ... 
// somthing like - env.fromElements(leftBlueShoe, rightRedShoe, leftGreenShoe, rightBlueShoe, ...);

How to form a pair of same color shoes and expect matched pairs to be emitted immediately, unmatched shoe to wait for its pair till the end of window.

DataStream<Tuple5<String, Boolean, String, Boolean, String>> shoePairs = ...
// few events from shoePairs stream:
Tuple5<> shoePair   = Tuple5.of("blue", true, "blue", false, "pairFound");
Tuple5<> notShoePair= Tuple5.of("red", true, "red", false, "pairNotFound"); // Even if pair not found in window we tagged and kept in stream

Approaches tried(Ignore this to avoid confusion):

  1. By splitting streams into left and right and windowing on joins(Does it incur cost?)

  2. Windowing on same stream with TumblingProcessingTimeWindow and custom join logic in apply(). This window does not gets triggered instantly even if all events are paired.


Solution

  • One of the exercises in the Flink training is about finding event pairs; it's similar in spirit to what you're asking for. See the Rides and Fares Exercise, which uses a RichCoFlatMapFunction to do the pairing.

    The solution there assumes that perfect pairing is always possible, so it doesn't tackle the case of unmatched pairs. But you can find a variation here that takes this a step further. This example uses the timers in a CoProcessFunction to detect unmatched pairs.

    Other points:

    Splitting the stream into left and right sub-streams should have negligible cost.

    I think a CoGroupFunction should work. If you tried this and it didn't seem to work, perhaps you were using event time windowing and the final watermark was missing, preventing the window from being closed.

    Update:

    Having looked at your code, I see an issue in the implementation. Your timestamp extractor is using the system clock rather than timestamps in the events. This will give you something similar to (but worse than) using processing time. I say "worse than processing time" because you are allowing for events to be out-of-order, which adds latency, and it prevents a window from closing until an event sufficiently beyond the window's end point arrives. This means that the last window can never be triggered.

    As a test, try switching the time characteristic to processing time, remove assignTimestampsAndWatermarks, and see if the CoGroupFunction works correctly. You could also use ingestion time, so long as you remove your watermarking and let Flink handle it (with processing time watermarking is irrelevant; with ingestion time Flink does the watermarking for you, unless you override it).

    If you want to use event time in your application, use finite sources in your testing. When finite sources (such as reading from a file or collection) reach the end of their input, they send a very large watermark through the job, which closes any open windows.