I have a timewindow that I try to determine if I get a new key over a period of time. I am pushing data via kafka and when I debug it, I see that the data is getting to the keyby
method but it is not reaching the process
method and not being collected by the collector. I am using BoundedOutOfOrdernessTimestampExtractor
for assigning watermarks :
case class Src(qip:Ip, ref: Ip, ts: Long) extends FooRequest
class TsExtractor extends BoundedOutOfOrdernessTimestampExtractor[Src](Time.hours(3)){
override def extractTimestamp(element: Src): Long = element.ts
}
class RefFilter extends ProcessWindowFunction[Src, IpDetectionSrc, String, TimeWindow]{
private lazy val stateDescriptor = new ValueStateDescriptor("refFilter", createTypeInformation[String])
override def process(key: String, context: Context, elements: Iterable[Src], out: Collector[IpDetectionSrc]): Unit = {
println(s"RefIpFilter processing $key")//data is not getting here
if(Option(context.windowState.getState(stateDescriptor).value()).isEmpty){
println(s"new key found $key") //data is not getting here also
context.windowState.getState(stateDescriptor).update(key)
out.collect(elements.head)
}
}
}
lazy val env: StreamExecutionEnvironment =
setupEnv(StreamExecutionEnvironment.getExecutionEnvironment)(300000,Some(stateDir), Some(TimeCharacteristic.EventTime))
lazy val src: DataStream[FooRequest] = env.addSource(consumer)
lazy val uniqueRef:DataStream[FooRequest] => DataStream[Src] = src => src
.flatMap(new FlatMapFunction[FooRequest,Src ]{
override def flatMap(value: FooRequest, out: Collector[Src]): Unit = value match {
case r: Src =>
out.collect(r)
case invalid =>
log.warn(s"filtered unexpected request $invalid")
}
})
.assignTimestampsAndWatermarks(new TsExtractor)
.keyBy(r => r.ref)
.timeWindow(Time.seconds(120))
.allowedLateness(Time.seconds(360))
.process(new RefFilter)
uniqueRef(src).addSink(sink)
env.execute()
any assistance will be greatly appreciated
The BoundedOutOfOrdernessTimestampExtractor
keeps track of the highest timestamp it has seen so far, and produces watermarks that trail behind this by the configured delay (so three hours, in this case). These watermarks are produced periodically, every 200 msec by default. So with only a single event, the watermark will be 3 hours behind this event, and the window will never be triggered. Furthermore, with finite input, the job will stop running once it has processed all of the events.
context.windowState
is per-window state, with a limited lifetime. Each 2 minute window will have its own instance, and it is cleared once the allowed lateness for the window has expired. If you want keyed window state with a global scope, with an indefinite lifetime, use context.globalState
instead.