Search code examples
apache-flinkflink-streaming

How to write outer join function in Flink using connect?


I want to act outer join on 2 data streams, and it is better not to put them in a window(I've seen Cogroup always comes with a window).

I tried this:

    val controlStream = Flink.flinkEnv.fromElements(
      (1, "mex1", "stream1_feat1"),
      (1, "mex2", "stream1_feat2")
    ).keyBy(x => (x._1, x._2))

    val wordStream = Flink.flinkEnv.fromElements(
      (1, "mex1", "stream2_feat1"),
      (1, "mex3", "stream2_feat3")
    ).keyBy(x => (x._1, x._2))

    val filteredStream = controlStream
        .connect(wordStream)
        .flatMap(new ControlFunction)

////////////////////////////////////////////////////////////////////////

class ControlFunction extends RichCoFlatMapFunction[
    (Int, String, String),
    (Int, String, String),
    (Int, String, String, String)] {

    // outer join
    private var state1: ValueState[(Int, String, String)] = _
    private var state2: ValueState[(Int, String, String)] = _

    override def open(parameters: Configuration): Unit = {
      state1 = getRuntimeContext.getState(
        new ValueStateDescriptor[(Int, String, String)]("s1", createTypeInformation[(Int, String, String)]))

      state2 = getRuntimeContext.getState(
        new ValueStateDescriptor[(Int, String, String)]("s2", createTypeInformation[(Int, String, String)]))

    }

    override def flatMap1(value: (Int, String, String),
                          out: Collector[(Int, String, String, String)]): Unit = {

      val state2Value = state2.value

      if (state2Value != null) {
        println("inside map1 not null")
        state2.clear()
        out.collect((value._1, value._2, value._3, state2Value._3))
      } else {
        println("inside map1 null")
        state1.update(value)
        out.collect((value._1, value._2, value._3, "NA"))
      }
    }

    override def flatMap2(value: (Int, String, String),
                          out: Collector[(Int, String, String, String)]): Unit = {

      val state1Value = state1.value

      if (state1Value != null) {
        println("inside map2 not null")
        state1.clear()
        out.collect((value._1, value._2, state1Value._3, value._3))
      } else {
        println("inside map2 null")
        state2.update(value)
        out.collect((value._1, value._2, "NA", value._3))
      }
    }

  }

Which gave me:

5> (1,mex2,stream1_feat2,NA)
8> (1,mex1,stream1_feat1,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)

Where record (1,mex1,stream1_feat1,NA) should not be produced. The result that I want to achieve is an outer join:

5> (1,mex2,stream1_feat2,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)

By getting the print statement, I found that the 2 flapMaps were passed in sequence, which made mex1 produced twice, anyway to resolve this?

Thanks in advance!


Solution

  • You can't expect a streaming outer join to behave the same way as a batch outer join. A batch outer join can fully scan both input tables, and will only produce output rows containing nulls when matching records do not exist. With a streaming implementation, you can not know whether if by waiting you might eventually receive a matching record.

    Because they can not access the future, stream processing applications are often forced to produce as output a stream that contains results that are being updated as more information becomes available.

    One thing you could do would be to wait for some time to see if emitting a result containing an NA would be mistake, but eventually you'd have to stop waiting and produce a result.

    Note that Flink's Table API has an outer join, but you'll notice it's marked as "Result Updating" for the reasons explained above.