Search code examples
apache-flinkcorrelationflink-streaming

Find stream of events that are not grouped using coGroupFunction


How can we find stream of events that are not matched with other events, when using CoGroupFunction?

Lets consider people are communicating over a phone call. In Tuple2<String, Integer>, f0 is name of person and f1 is phone number they are calling to OR receiving call from. We have paired them using coGroup, however we are missing people who are getting calls from person outside the world.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, Integer>> callers = env.fromElements(
        new Tuple2<String, Integer>("alice->", 12), // alice dials 12
        new Tuple2<String, Integer>("bob->", 13),   // bob dials 13
        new Tuple2<String, Integer>("charlie->", 19))
        .assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));

DataStream<Tuple2<String, Integer>> callees = env.fromElements(
        new Tuple2<String, Integer>("->carl", 12), // carl received call
        new Tuple2<String, Integer>("->ted", 13),
        new Tuple2<String, Integer>("->chris", 7))
        .assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));;

DataStream<Tuple1<String>> groupedStream = callers.coGroup(callees)
        .where(evt -> evt.f1).equalTo(evt -> evt.f1)
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .apply(new IntEqualCoGroupFunc());

groupedStream.print(); // prints 1> (alice->-->carl) \n 1> (bob->-->ted)

//DataStream<Tuple1<String>> notGroupedStream = ..; // people without pairs in last window
//notGroupedStream.print(); // should print charlie->-->someone \n someone->-->chris

env.execute();

Solution

  • To be honest, the simplest solution seems to be changing the IntEqualCoGroupFunc, so that instead of String it returns (Boolean, String). This is because coGroup processes also those elements that do not have matching keys, those elements will have one Iterable empty in the function coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) i.e. for Your case it would receive ("->chris", 7) as first and empty Iterable as second.

    The change of the signature could allow You to easily emit also results that do not have matching keys and simply split them into the separate streams at later stage of processing.

    // Implementation of IntEqualCoGroupFunc
    @Override
    public void coGroup(Iterable<Tuple2<String, Integer>> outbound, Iterable<Tuple2<String, Integer>> inbound,
            Collector<Tuple1<String>> out) throws Exception {
    
        for (Tuple2<String, Integer> outboundObj : outbound) {
            for (Tuple2<String, Integer> inboundObj : inbound) {
                out.collect(Tuple1.of(outboundObj.f0 + "-" + inboundObj.f0)); //matching pair
                return;
            }
            out.collect(Tuple1.of(outboundObj.f0 + "->someone")); //inbound is empty
            return;
        }
    
        // outbound is empty
        for (Tuple2<String, Integer> inboundObj : inbound) {
            out.collect(Tuple1.of("someone->-" + inboundObj.f0));
            return;
        }
        //inbound also empty
        out.collect(Tuple1.of("someone->-->someone"));
    }
    

    Output as follows:

    2> (someone->-->chris)
    2> (charlie->->someone)
    1> (alice->-->carl)
    1> (bob->-->ted)