Search code examples
apache-flink

Why does Flink emit duplicate records on a DataStream join + Global window?


I'm learning/experimenting with Flink, and I'm observing some unexpected behavior with the DataStream join, and would like to understand what is happening...

Let's say I have two streams with 10 records each, which I want to join on a id field. Let's assume that for each record in one stream had a matching one in the other, and the IDs are unique in each stream. Let's also say I have to use a global window (requirement).

Join using DataStream API (my simplified code in Scala):

val stream1 = ... // from a Kafka topic on my local machine (I tried with and without .keyBy)
val stream2 = ... 

stream1
  .join(stream2)
  .where(_.id).equalTo(_.id)
  .window(GlobalWindows.create()) // assume this is a requirement
  .trigger(CountTrigger.of(1))
  .apply {
    (row1, row2) => // ... 
  }
  .print()

Result:

  • Everything is printed as expected, each record from the first stream joined with a record from the second one.

However:

  • If I re-send one of the records (say, with an updated field) from one of the stream to that stream, two duplicate join events get emitted 😞
  • If I repeat that operation (with or without updated field), I will get 3 emitted events, then 4, 5, etc... 😞

Could someone in the Flink community explain why this is happening? I would have expected only 1 event emitted each time. Is it possible to achieve this with a global window?

In comparison, the Flink Table API behaves as expected in that same scenario, but for my project I'm more interested in the DataStream API.

Example with Table API, which worked as expected:

tableEnv
  .sqlQuery(
    """
      |SELECT *
      |  FROM stream1
      |       JOIN stream2
      |       ON stream1.id = stream2.id
    """.stripMargin)
  .toRetractStream[Row]
  .filter(_._1) // just keep the inserts
  .map(...)
  .print() // works as expected, after re-sending updated records

Thank you,

Nicolas


Solution

  • The issue is that records are never removed from your global window. So you trigger the join operation on the global window, whenever a new record has arrived, but the old records are still present.

    Thus, to get it running in your case, you'd need to implement a custom evictor. I expanded your example in a minimal working example and added the evictor, which I will explain after the snippet.

    val data1 = List(
      (1L, "myId-1"),
      (2L, "myId-2"),
      (5L, "myId-1"),
      (9L, "myId-1"))
    
    val data2 = List(
      (3L, "myId-1", "myValue-A"))
    
    val stream1 = env.fromCollection(data1)
    val stream2 = env.fromCollection(data2)
    
    stream1.join(stream2)
      .where(_._2).equalTo(_._2)
      .window(GlobalWindows.create()) // assume this is a requirement
      .trigger(CountTrigger.of(1))
      .evictor(new Evictor[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)], GlobalWindow](){
        override def evictBefore(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {}
    
        override def evictAfter(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {
          import scala.collection.JavaConverters._
          val lastInputTwoIndex = elements.asScala.zipWithIndex.filter(e => e._1.getValue.isTwo).lastOption.map(_._2).getOrElse(-1)
          if (lastInputTwoIndex == -1) {
            println("Waiting for the lookup value before evicting")
            return
          }
          val iterator = elements.iterator()
          for (index <- 0 until size) {
            val cur = iterator.next()
            if (index != lastInputTwoIndex) {
              println(s"evicting ${cur.getValue.getOne}/${cur.getValue.getTwo}")
              iterator.remove()
            }
          }
        }
      })
      .apply((r, l) => (r, l))
      .print()
    

    The evictor will be applied after the window function (join in this case) has been applied. It's not entirely clear how your use case exactly should work in case you have multiple entries in the second input, but for now, the evictor only works with single entries.

    Whenever a new element comes into the window, the window function is immediately triggered (count = 1). Then the join is evaluated with all elements having the same key. Afterwards, to avoid duplicate outputs, we remove all entries from the first input in the current window. Since, the second input may arrive after the first inputs, no eviction is performed, when the second input is empty. Note that my scala is quite rusty; you will be able to write it in a much nicer way. The output of a run is:

    Waiting for the lookup value before evicting
    Waiting for the lookup value before evicting
    Waiting for the lookup value before evicting
    Waiting for the lookup value before evicting
    4> ((1,myId-1),(3,myId-1,myValue-A))
    4> ((5,myId-1),(3,myId-1,myValue-A))
    4> ((9,myId-1),(3,myId-1,myValue-A))
    evicting (1,myId-1)/null
    evicting (5,myId-1)/null
    evicting (9,myId-1)/null
    

    A final remark: if the table API offers already a concise way of doing what you want, I'd stick to it and then convert it to a DataStream when needed.