Search code examples
scalaapache-flink

How can countWindow and timeWindow be concatenated?


I am trying to set up a stream where first a countWindow is executed. Results emitted by countWindow need to be passed to a different timeWindow. The problem is that no results are emitted by timeWindow.

I have came up to a very simple code that demonstrates the problem:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

env
 .addSource(new RichSourceFunction[Int] {
  override def cancel(): Unit = {}

  override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
   var i = 0
   while (true) {
    println(s"Source emits element ${i}")
    ctx.collect(i)
    i = i + 1
    Thread.sleep(1000)
   }
  }
 })
 .keyBy(new KeySelector[Int, String] {
  override def getKey(value: Int): String = {
   println("getKey 1")
   "KEY1"
  }
 })
 .countWindow(2, 1)
 .reduce(new ReduceFunction[Int] {
  override def reduce(value1: Int, value2: Int): Int = {
   println("reduce 1")
   value1
  }
 })
 .keyBy(new KeySelector[Int, String] {
  override def getKey(value: Int): String = {
   println("getKey 2")
   "KEY2"
  }
 })
 .timeWindow(Time.seconds(5))
 .reduce(new ReduceFunction[Int] {
  override def reduce(value1: Int, value2: Int): Int = {
   println("reduce 2")
   value1
  }
 })
 .print()

With the above code, I would expect one element to be printed to output every 5 seconds. Nevertheless, this is not the case. Actual output shows that "print" function is only reached once:

Source emits element 0
getKey 1
getKey 2
getKey 2
1> 0
Source emits element 1
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 2
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 3
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 4
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 5
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 6
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 7
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 8
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 9
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 10
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 11
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 12
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2

Solution

  • Interesting example. If you change from IngestionTime to ProcessingTime, the example runs correctly.

    Looking around in the debugger, what I'm seeing is that with IngestionTime, the StreamRecords produced by the CountWindow no longer have valid timestamps, and so the TimeWindow is unable to function properly.

    To fix this, you need to re-establish the timestamps and watermarks after the CountWindow, with something like this:

      ...
      .countWindow(2, 1)
      .reduce(new ReduceFunction[Int] {
        override def reduce(value1: Int, value2: Int): Int = {
          println("reduce 1")
          value1
        }
      })
      .assignTimestampsAndWatermarks(new IngestionTimeExtractor[Int]())
      .keyBy(new KeySelector[Int, String] {
        override def getKey(value: Int): String = {
          println("getKey 2")
          "KEY2"
        }
      })
      ...
    

    A similar technique would work with event time.