I want to act outer join on 2 data streams, and it is better not to put them in a window(I've seen Cogroup
always comes with a window).
I tried this:
val controlStream = Flink.flinkEnv.fromElements(
(1, "mex1", "stream1_feat1"),
(1, "mex2", "stream1_feat2")
).keyBy(x => (x._1, x._2))
val wordStream = Flink.flinkEnv.fromElements(
(1, "mex1", "stream2_feat1"),
(1, "mex3", "stream2_feat3")
).keyBy(x => (x._1, x._2))
val filteredStream = controlStream
.connect(wordStream)
.flatMap(new ControlFunction)
////////////////////////////////////////////////////////////////////////
class ControlFunction extends RichCoFlatMapFunction[
(Int, String, String),
(Int, String, String),
(Int, String, String, String)] {
// outer join
private var state1: ValueState[(Int, String, String)] = _
private var state2: ValueState[(Int, String, String)] = _
override def open(parameters: Configuration): Unit = {
state1 = getRuntimeContext.getState(
new ValueStateDescriptor[(Int, String, String)]("s1", createTypeInformation[(Int, String, String)]))
state2 = getRuntimeContext.getState(
new ValueStateDescriptor[(Int, String, String)]("s2", createTypeInformation[(Int, String, String)]))
}
override def flatMap1(value: (Int, String, String),
out: Collector[(Int, String, String, String)]): Unit = {
val state2Value = state2.value
if (state2Value != null) {
println("inside map1 not null")
state2.clear()
out.collect((value._1, value._2, value._3, state2Value._3))
} else {
println("inside map1 null")
state1.update(value)
out.collect((value._1, value._2, value._3, "NA"))
}
}
override def flatMap2(value: (Int, String, String),
out: Collector[(Int, String, String, String)]): Unit = {
val state1Value = state1.value
if (state1Value != null) {
println("inside map2 not null")
state1.clear()
out.collect((value._1, value._2, state1Value._3, value._3))
} else {
println("inside map2 null")
state2.update(value)
out.collect((value._1, value._2, "NA", value._3))
}
}
}
Which gave me:
5> (1,mex2,stream1_feat2,NA)
8> (1,mex1,stream1_feat1,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)
Where record (1,mex1,stream1_feat1,NA)
should not be produced.
The result that I want to achieve is an outer join:
5> (1,mex2,stream1_feat2,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)
By getting the print statement, I found that the 2 flapMaps
were passed in sequence, which made mex1
produced twice, anyway to resolve this?
Thanks in advance!
You can't expect a streaming outer join to behave the same way as a batch outer join. A batch outer join can fully scan both input tables, and will only produce output rows containing nulls when matching records do not exist. With a streaming implementation, you can not know whether if by waiting you might eventually receive a matching record.
Because they can not access the future, stream processing applications are often forced to produce as output a stream that contains results that are being updated as more information becomes available.
One thing you could do would be to wait for some time to see if emitting a result containing an NA would be mistake, but eventually you'd have to stop waiting and produce a result.
Note that Flink's Table API has an outer join, but you'll notice it's marked as "Result Updating" for the reasons explained above.