Search code examples
apache-flinkflink-streaming

Processing time windows doesn't work on finite data sources in Apache Flink


I'm trying to apply a very simple window function to a finite data stream in Apache Flink (locally, no cluster). Here's the example:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))

  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create)
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.sorted.toString())
    }
  })

  .print()

env.execute()

Here, I try to group all the elements that arrive into the window within a second and then just print these groups.

I assumed that all the elements would be produces in much less than one second and get into one window, so there will be one incoming element in print(). However, nothing is printed at all when I run this.

If I remove all the windowing stuff, like

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .print()

I see the elements printed after the run. I also tried this with the file source, no difference.

The default parallelism on my machine is 6. If I experiment with the level of parallelism and delays, like this

val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .map { x => Thread.sleep(1500); x }

I able to get some--not all--elements into groups, which are printed.

My first assumption is that the source finishes much faster than 1 second and the task is shut down before the window's timer fires. Debugging showed that the timer setting line in ProcessingTimeTrigger is reached. Shouldn't all started timers finish before a task shuts down (at least this is the impression I got from the code)?

Could you please help me understand this and make this more deterministic?

Update #1, 23.09.2018:

I also experimented with event time windows instead of processing time windows. If I do this:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] {
    override def extractAscendingTimestamp(element: String): Long = {
      element.charAt(0).toInt
    }
  })

  .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
  .trigger(EventTimeTrigger.create)
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.toString())
    }
  })

  .print()

env.execute()

Then again nothing is printed. The debugger shows that the trigger's onElement is called for every element, but onEventTime is never called.

Also, if I modify the the timestamp extractor to make bigger steps:

element.charAt(0).toInt * 1000

all the elements are printed (one element per group, which is expected), apart from the last.

Update #2, 23.09.2018:

Update #1 is answered in this comment.


Solution

  • When a finite source reaches the end, if you are using event time then a Watermark with timestamp Long.MAX_VALUE will be injected, which will cause all event time timers to fire. However, with processing time, Flink will wait for all currently firing timers to complete their actions, and then exit.

    As you suspected, you're not seeing any output because the source finishes very quickly.

    Deterministic behavior is straightforward with event time processing; with processing time it's not really achievable.

    But here's a hack that more or less works:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    val s = env.fromCollection(List("a", "b", "c", "d", "e"))
    val t = env.addSource((context: SourceContext[String]) => {
      while(true) {
        Thread.sleep(100)
        context.collect("dummy")
      }
    })
    
    s.union(t)
      .filter(_ != "dummy")
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
        override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
          out.collect(elements.toList.sorted.toString())
        }
      })
      .print()
    
    env.execute()