Search code examples
apache-flinkflink-streaming

Flink's CoProcessFunction doesn't trigger onTimer


I try to aggregate two streams like that

val joinedStream = finishResultStream.keyBy(_.searchId)
  .connect(startResultStream.keyBy(_.searchId))
  .process(new SomeCoProcessFunction)

and then working on them in SomeCoProcessFunction class like that

class SomeCoProcessFunction extends CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated] {

   override def processElement1(finished: SearchFinished, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = { 

       // aggregating some "finished" data ...

   }

   override def processElement2(created: SearchCreated, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = {

       val timerService = ctx.timerService()
       timerService.registerEventTimeTimer(System.currentTimeMillis + 5000)

       // aggregating some "created" data ...
   }

   override def onTimer(timestamp: Long, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#OnTimerContext, out: Collector[SearchAggregated]): Unit = {

       val watermark: Long = ctx.timerService().currentWatermark()
       println(s"watermark!!!! $watermark")

       // clean up the state

   }

What I want is to clean up the state after a certain time( 5000 Milliseconds), and that is what onTimer have to be used for. But since it never get fired, I kinda ask my self what am I doing wrong here?

Thanks in advance for any hint.

UPDATE:

Solution was to set timeService like that (tnx to both fabian-hueske and Beckham):

timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5000)

I still didn't really figure out what timerService.registerEventTimeTimer does, watermark ctx.timerService().currentWatermark() shows always -9223372036854775808 now matter how long before EventTimer was registered.


Solution

  • The problem is that you are registering an event-time timer (timerService.registerEventTimeTimer) with a processing-time timestamp (System.currentTimeMillis + 5000).

    System.currentTimeMillis returns the current machine time but event-time is not based on the machine time but on the time computed from watermarks.

    Either you should register a processing-timer or register an event-time timer with an event-time timestamp. You can get the timestamp of the current watermark or the timestamp of the current record from the Context object that is passed as a parameter to processElement1() and processElement2().