Search code examples
apache-flinkflink-streamingsliding-window

Flink SlidingEventTimeWindows doesnt work as expected


I have a stream execution configured as

object FlinkSlidingEventTimeExample extends App {
    case class Trx(timestamp:Long, id:String, trx:String, count:Int)
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    val watermarkS1 = WatermarkStrategy
      .forBoundedOutOfOrderness[Trx](Duration.ofSeconds(15))
      .withTimestampAssigner(new SerializableTimestampAssigner[Trx] {
          override def extractTimestamp(element: Trx, recordTimestamp: Long): Long = element.timestamp
      })
    val s1 = env.socketTextStream("localhost", 9999)
      .flatMap(l => l.split(" "))
      .map(l => Trx(timestamp = l.split(",")(0).toLong, id = l.split(",")(1), trx = l.split(",")(2), count = 1))
      .assignTimestampsAndWatermarks(watermarkS1)
      .keyBy(l => l.id)
      .window(SlidingEventTimeWindows.of(Time.seconds(20),Time.seconds(5))) // Not working
      //.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5))) // Working
      .sum("count")
      .print
    env.execute("FlinkSlidingEventTimeExample")
}

I have already defined a watermark, but couldn't figure out why it is not producing anything. Does anyone has any ideas? My flink version is 1.14.0

My build.sbt is like below:

scalaVersion := "2.12.15"



libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.14.0"
libraryDependencies += "org.apache.flink" %% "flink-runtime-web" % "1.14.0"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.14.0"
libraryDependencies += "org.apache.flink" % "flink-queryable-state-runtime" % "1.14.0"

I am entering input data from socket(port:9999) like below:

1640375790000,1,trx1
1640375815000,1,trx2
1640375841000,1,trx3
1640375741000,1,trx4

tried to give larger timestamp than window size, but still not working.

Flink Web UI screenshot: web-ui watermarks


Solution

  • Earlier answer deleted; it was based on faulty assumptions about the setup.

    When event time windows fail to produce results it's always something to do with watermarking.

    The timestamps in your input correspond to

    December 24, 2021 19:56:30
    December 24, 2021 19:56:55
    December 24, 2021 19:57:21
    December 24, 2021 19:55:41
    

    so there's more than enough data to trigger the closure of several sliding windows. For example, trx2 has a large enough timestamp that it can generate a watermark large enough to close these windows that contain 19:56:30:

    19:56:15 - 19:56:34.999
    19:56:20 - 19:56:39.999
    

    However, your execution graph looks something like this:

    enter image description here

    The problem is the rebalance between the socket source and the task that follows (the one doing flatmap -> map -> watermarks). Each of your four events is going to a different instance of the watermark strategy, and some instances aren't receiving any events. That's why there are no watermarks being generated.

    What you want to do instead is to chain the input parsing and watermark generation to the source at the same parallelism, so that your execution graph looks like this instead:

    enter image description here

    This code will do that:

    env
      .socketTextStream("localhost", 9999)
      .map(l => {
        val input = l.split(",")
        Trx(timestamp = input(0).toLong, id = input(1), trx = input(2), count = 1)
      })
      .setParallelism(1)
      .assignTimestampsAndWatermarks(watermarkS1)
      .setParallelism(1)
      .keyBy(l => l.id)
      .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))
      .sum("count")
      .print
    

    In general it's not necessary to do watermarking at a parallelism of one, but it is necessary that every instance of the watermark generator either has enough events to work with, or is configured with withIdleness. (And if every instance is idle then you won't get any results either.)