Search code examples
scalaapache-flinkflink-streaming

flink data is not processed by the process function in a timewindow operator


I have a timewindow that I try to determine if I get a new key over a period of time. I am pushing data via kafka and when I debug it, I see that the data is getting to the keyby method but it is not reaching the process method and not being collected by the collector. I am using BoundedOutOfOrdernessTimestampExtractor for assigning watermarks :

    case class Src(qip:Ip, ref: Ip, ts: Long) extends FooRequest

    class TsExtractor extends BoundedOutOfOrdernessTimestampExtractor[Src](Time.hours(3)){
      override def extractTimestamp(element: Src): Long = element.ts
    }

    class RefFilter extends ProcessWindowFunction[Src, IpDetectionSrc, String, TimeWindow]{
      private lazy val stateDescriptor = new ValueStateDescriptor("refFilter",  createTypeInformation[String])

      override def process(key: String, context: Context, elements: Iterable[Src], out: Collector[IpDetectionSrc]): Unit = {
        println(s"RefIpFilter processing $key")//data is not getting here 
        if(Option(context.windowState.getState(stateDescriptor).value()).isEmpty){
          println(s"new key found $key") //data is not getting here also 
          context.windowState.getState(stateDescriptor).update(key)
          out.collect(elements.head)
        }
      }
    }

lazy val env: StreamExecutionEnvironment =
    setupEnv(StreamExecutionEnvironment.getExecutionEnvironment)(300000,Some(stateDir), Some(TimeCharacteristic.EventTime))

 lazy val src: DataStream[FooRequest] = env.addSource(consumer)

 lazy val uniqueRef:DataStream[FooRequest] => DataStream[Src] = src => src 
        .flatMap(new FlatMapFunction[FooRequest,Src ]{
          override def flatMap(value: FooRequest, out: Collector[Src]): Unit =   value match {
            case r: Src =>
              out.collect(r)
            case invalid =>
              log.warn(s"filtered unexpected request $invalid")
          }
        })
        .assignTimestampsAndWatermarks(new TsExtractor)
        .keyBy(r => r.ref)
        .timeWindow(Time.seconds(120))
        .allowedLateness(Time.seconds(360))
        .process(new RefFilter)

uniqueRef(src).addSink(sink)
env.execute()

any assistance will be greatly appreciated


Solution

  • The BoundedOutOfOrdernessTimestampExtractor keeps track of the highest timestamp it has seen so far, and produces watermarks that trail behind this by the configured delay (so three hours, in this case). These watermarks are produced periodically, every 200 msec by default. So with only a single event, the watermark will be 3 hours behind this event, and the window will never be triggered. Furthermore, with finite input, the job will stop running once it has processed all of the events.

    context.windowState is per-window state, with a limited lifetime. Each 2 minute window will have its own instance, and it is cleared once the allowed lateness for the window has expired. If you want keyed window state with a global scope, with an indefinite lifetime, use context.globalState instead.