How can we find stream of events that are not matched with other events, when using CoGroupFunction
?
Lets consider people are communicating over a phone call. In Tuple2<String, Integer>
, f0
is name of person and f1
is phone number they are calling to OR receiving call from.
We have paired them using coGroup
, however we are missing people who are getting calls from person outside the world.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, Integer>> callers = env.fromElements(
new Tuple2<String, Integer>("alice->", 12), // alice dials 12
new Tuple2<String, Integer>("bob->", 13), // bob dials 13
new Tuple2<String, Integer>("charlie->", 19))
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));
DataStream<Tuple2<String, Integer>> callees = env.fromElements(
new Tuple2<String, Integer>("->carl", 12), // carl received call
new Tuple2<String, Integer>("->ted", 13),
new Tuple2<String, Integer>("->chris", 7))
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));;
DataStream<Tuple1<String>> groupedStream = callers.coGroup(callees)
.where(evt -> evt.f1).equalTo(evt -> evt.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new IntEqualCoGroupFunc());
groupedStream.print(); // prints 1> (alice->-->carl) \n 1> (bob->-->ted)
//DataStream<Tuple1<String>> notGroupedStream = ..; // people without pairs in last window
//notGroupedStream.print(); // should print charlie->-->someone \n someone->-->chris
env.execute();
To be honest, the simplest solution seems to be changing the IntEqualCoGroupFunc
, so that instead of String
it returns (Boolean, String)
.
This is because coGroup
processes also those elements that do not have matching keys, those elements will have one Iterable
empty in the function coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out)
i.e. for Your case it would receive ("->chris", 7)
as first
and empty Iterable
as second
.
The change of the signature could allow You to easily emit also results that do not have matching keys and simply split them into the separate streams at later stage of processing.
// Implementation of IntEqualCoGroupFunc
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> outbound, Iterable<Tuple2<String, Integer>> inbound,
Collector<Tuple1<String>> out) throws Exception {
for (Tuple2<String, Integer> outboundObj : outbound) {
for (Tuple2<String, Integer> inboundObj : inbound) {
out.collect(Tuple1.of(outboundObj.f0 + "-" + inboundObj.f0)); //matching pair
return;
}
out.collect(Tuple1.of(outboundObj.f0 + "->someone")); //inbound is empty
return;
}
// outbound is empty
for (Tuple2<String, Integer> inboundObj : inbound) {
out.collect(Tuple1.of("someone->-" + inboundObj.f0));
return;
}
//inbound also empty
out.collect(Tuple1.of("someone->-->someone"));
}
Output as follows:
2> (someone->-->chris)
2> (charlie->->someone)
1> (alice->-->carl)
1> (bob->-->ted)