I try to aggregate two streams like that
val joinedStream = finishResultStream.keyBy(_.searchId)
.connect(startResultStream.keyBy(_.searchId))
.process(new SomeCoProcessFunction)
and then working on them in SomeCoProcessFunction
class like that
class SomeCoProcessFunction extends CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated] {
override def processElement1(finished: SearchFinished, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = {
// aggregating some "finished" data ...
}
override def processElement2(created: SearchCreated, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = {
val timerService = ctx.timerService()
timerService.registerEventTimeTimer(System.currentTimeMillis + 5000)
// aggregating some "created" data ...
}
override def onTimer(timestamp: Long, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#OnTimerContext, out: Collector[SearchAggregated]): Unit = {
val watermark: Long = ctx.timerService().currentWatermark()
println(s"watermark!!!! $watermark")
// clean up the state
}
What I want is to clean up the state after a certain time( 5000 Milliseconds), and that is what onTimer
have to be used for. But since it never get fired, I kinda ask my self what am I doing wrong here?
Thanks in advance for any hint.
UPDATE:
Solution was to set timeService like that (tnx to both fabian-hueske and Beckham):
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5000)
I still didn't really figure out what timerService.registerEventTimeTimer
does, watermark ctx.timerService().currentWatermark()
shows always -9223372036854775808
now matter how long before EventTimer was registered.
The problem is that you are registering an event-time timer (timerService.registerEventTimeTimer
) with a processing-time timestamp (System.currentTimeMillis + 5000
).
System.currentTimeMillis
returns the current machine time but event-time is not based on the machine time but on the time computed from watermarks.
Either you should register a processing-timer or register an event-time timer with an event-time timestamp. You can get the timestamp of the current watermark or the timestamp of the current record from the Context
object that is passed as a parameter to processElement1()
and processElement2()
.